logs go brrrrr

- fix deadlock in waitgroups
- batch inserts
- fix request_time mapping
This commit is contained in:
Neil Hanlon 2025-01-04 17:56:51 -05:00
parent 649b754be6
commit 77c266c43e
Signed by: neil
GPG key ID: 705BC21EC3C70F34
4 changed files with 243 additions and 122 deletions

2
.gitignore vendored
View file

@ -1,3 +1,3 @@
fastly_log_processor_errors.log process_fastly_logs_errors.log
logs.db logs.db
logs.db-journal logs.db-journal

View file

@ -29,6 +29,7 @@ func CreateLogTable() error {
service_id TEXT, service_id TEXT,
client_ip TEXT, client_ip TEXT,
request_method TEXT, request_method TEXT,
request_time DATETIME,
request_url TEXT, request_url TEXT,
protocol TEXT, protocol TEXT,
response_status INTEGER, response_status INTEGER,

158
main.go
View file

@ -2,19 +2,19 @@ package main
import ( import (
"bufio" "bufio"
"context"
"fmt" "fmt"
"io"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"io"
"context"
"github.com/urfave/cli/v3" "github.com/urfave/cli/v3"
"git.resf.org/infrastructure/process-fastly-logs/db" "git.resf.org/infrastructure/process-fastly-logs/db"
"git.resf.org/infrastructure/process-fastly-logs/parser"
"git.resf.org/infrastructure/process-fastly-logs/models" "git.resf.org/infrastructure/process-fastly-logs/models"
"git.resf.org/infrastructure/process-fastly-logs/parser"
) )
func init() { func init() {
@ -128,19 +128,20 @@ func processLogFile(filePath string) error {
defer file.Close() defer file.Close()
// Create channels // Create channels
linesChan := make(chan string, 100) linesChan := make(chan string, 1000)
entriesChan := make(chan *models.LogEntry, 100) entriesChan := make(chan *models.LogEntry, 1000)
errorsChan := make(chan error, 100) errorsChan := make(chan error, 1000)
var wg sync.WaitGroup var wgParsers sync.WaitGroup
// Start Goroutines to parse lines // Start Goroutines to parse lines
numParsers := 5 // Adjust as needed numParsers := 100 // Adjust as needed
for i := 0; i < numParsers; i++ { for i := 0; i < numParsers; i++ {
wg.Add(1) wgParsers.Add(1)
go func() { go func() {
defer wg.Done() defer wgParsers.Done()
for line := range linesChan { for line := range linesChan {
log.Println("parsing line")
entry, err := parser.ParseLogLine(line) entry, err := parser.ParseLogLine(line)
if err != nil { if err != nil {
errorsChan <- fmt.Errorf("Error parsing line: %v", err) errorsChan <- fmt.Errorf("Error parsing line: %v", err)
@ -152,14 +153,45 @@ func processLogFile(filePath string) error {
} }
// Start a Goroutine to save entries to the database // Start a Goroutine to save entries to the database
wg.Add(1) var wgSaver sync.WaitGroup
wgSaver.Add(1)
go func() { go func() {
defer wg.Done() defer wgSaver.Done()
const batchSize = 1000 // Adjust the batch size as needed
var batch []*models.LogEntry
batch = make([]*models.LogEntry, 0, batchSize)
for entry := range entriesChan { for entry := range entriesChan {
err := entry.Save() batch = append(batch, entry)
if err != nil { if len(batch) >= batchSize {
errorsChan <- fmt.Errorf("Error saving entry: %v", err) if err := saveBatch(batch); err != nil {
errorsChan <- fmt.Errorf("Error saving batch: %v", err)
} }
batch = batch[:0] // Reset the batch
}
}
// Save any remaining entries in the batch
if len(batch) > 0 {
if err := saveBatch(batch); err != nil {
errorsChan <- fmt.Errorf("Error saving batch: %v", err)
}
}
}()
// Error handling
var errorList []error
var errorMutex sync.Mutex
var wgErrors sync.WaitGroup
wgErrors.Add(1)
go func() {
defer wgErrors.Done()
for err := range errorsChan {
log.Println("handling error")
errorMutex.Lock()
errorList = append(errorList, err)
errorMutex.Unlock()
} }
}() }()
@ -168,16 +200,27 @@ func processLogFile(filePath string) error {
for scanner.Scan() { for scanner.Scan() {
linesChan <- scanner.Text() linesChan <- scanner.Text()
} }
close(linesChan) close(linesChan) // Close linesChan after all lines have been sent
// Wait for parsing and saving to finish // Wait for parsing Goroutines to finish
wg.Wait() wgParsers.Wait()
// Close entriesChan since no more entries will be sent
close(entriesChan) close(entriesChan)
// Wait for the saving Goroutine to finish
wgSaver.Wait()
// Close errorsChan after all senders have finished
close(errorsChan) close(errorsChan)
if len(errorsChan) > 0 { // Wait for the error handling Goroutine to finish
wgErrors.Wait()
// Log the errors
if len(errorList) > 0 {
log.Printf("Encountered errors while processing file %s:\n", filePath) log.Printf("Encountered errors while processing file %s:\n", filePath)
for err := range errorsChan { for _, err := range errorList {
log.Println(err) log.Println(err)
} }
} }
@ -187,3 +230,78 @@ func processLogFile(filePath string) error {
} }
return nil return nil
} }
// saveBatch saves a batch of log entries to the database using a transaction
func saveBatch(entries []*models.LogEntry) error {
tx, err := db.DB.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`
INSERT INTO logs(
priority,
timestamp,
cache_server,
service_id,
client_ip,
request_method,
request_time,
request_url,
protocol,
response_status,
response_body_bytes,
host,
user_agent,
datacenter,
geo_city,
geo_continent_code,
geo_region,
start_time,
elapsed_time_usec,
is_hit,
cache_result
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, entry := range entries {
_, err := stmt.Exec(
entry.Priority,
entry.Timestamp,
entry.CacheServer,
entry.ServiceID,
entry.ClientIP,
entry.RequestMethod,
entry.RequestTime,
entry.RequestURL,
entry.Protocol,
entry.ResponseStatus,
entry.ResponseBodyBytes,
entry.Host,
entry.UserAgent,
entry.Datacenter,
entry.GeoCity,
entry.GeoContinentCode,
entry.GeoRegion,
entry.StartTime,
entry.ElapsedTimeUsec,
entry.IsHit,
entry.CacheResult,
)
if err != nil {
tx.Rollback()
return err
}
}
if err := tx.Commit(); err != nil {
tx.Rollback()
return err
}
return nil
}

View file

@ -37,6 +37,7 @@ func (entry *LogEntry) Save() error {
service_id, service_id,
client_ip, client_ip,
request_method, request_method,
request_time,
request_url, request_url,
protocol, protocol,
response_status, response_status,
@ -51,7 +52,7 @@ func (entry *LogEntry) Save() error {
elapsed_time_usec, elapsed_time_usec,
is_hit, is_hit,
cache_result cache_result
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
statement, err := db.DB.Prepare(insertSQL) statement, err := db.DB.Prepare(insertSQL)
if err != nil { if err != nil {
@ -64,6 +65,7 @@ func (entry *LogEntry) Save() error {
entry.ServiceID, entry.ServiceID,
entry.ClientIP, entry.ClientIP,
entry.RequestMethod, entry.RequestMethod,
entry.RequestTime,
entry.RequestURL, entry.RequestURL,
entry.Protocol, entry.Protocol,
entry.ResponseStatus, entry.ResponseStatus,