From 77c266c43e4146da05ba2dcba9f3f027fc52ca60 Mon Sep 17 00:00:00 2001 From: Neil Hanlon Date: Sat, 4 Jan 2025 17:56:51 -0500 Subject: [PATCH] logs go brrrrr - fix deadlock in waitgroups - batch inserts - fix request_time mapping --- .gitignore | 2 +- db/database.go | 1 + main.go | 358 +++++++++++++++++++++++++++++--------------- models/log_entry.go | 4 +- 4 files changed, 243 insertions(+), 122 deletions(-) diff --git a/.gitignore b/.gitignore index a1d7e30..382db01 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ -fastly_log_processor_errors.log +process_fastly_logs_errors.log logs.db logs.db-journal diff --git a/db/database.go b/db/database.go index 23664e3..f9e93ca 100644 --- a/db/database.go +++ b/db/database.go @@ -29,6 +29,7 @@ func CreateLogTable() error { service_id TEXT, client_ip TEXT, request_method TEXT, + request_time DATETIME, request_url TEXT, protocol TEXT, response_status INTEGER, diff --git a/main.go b/main.go index ce57b95..8300488 100644 --- a/main.go +++ b/main.go @@ -2,187 +2,305 @@ package main import ( "bufio" + "context" "fmt" + "io" "log" "os" "path/filepath" "sync" - "io" - "context" - "github.com/urfave/cli/v3" + "github.com/urfave/cli/v3" "git.resf.org/infrastructure/process-fastly-logs/db" - "git.resf.org/infrastructure/process-fastly-logs/parser" "git.resf.org/infrastructure/process-fastly-logs/models" + "git.resf.org/infrastructure/process-fastly-logs/parser" ) func init() { - logFile, err := os.OpenFile("process_fastly_logs_errors.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - log.Println("Failed to open error log file:", err) - os.Exit(1) - } - log.SetOutput(io.MultiWriter(os.Stdout, logFile)) - log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) + logFile, err := os.OpenFile("process_fastly_logs_errors.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + log.Println("Failed to open error log file:", err) + os.Exit(1) + } + log.SetOutput(io.MultiWriter(os.Stdout, logFile)) + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) } func main() { cwd, err := os.Getwd() if err != nil { - log.Println(err) + log.Println(err) } - cmd := &cli.Command{ - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "logDir", - Value: cwd, - Usage: "directory containing logs to process", - }, - &cli.StringFlag{ - Name: "outDir", - Value: cwd, - Usage: "directory to ouput database and program logs", - }, - }, - Action: fastlyLogProcessor, - } + cmd := &cli.Command{ + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "logDir", + Value: cwd, + Usage: "directory containing logs to process", + }, + &cli.StringFlag{ + Name: "outDir", + Value: cwd, + Usage: "directory to ouput database and program logs", + }, + }, + Action: fastlyLogProcessor, + } - if err := cmd.Run(context.Background(), os.Args); err != nil { - log.Fatal(err) - } + if err := cmd.Run(context.Background(), os.Args); err != nil { + log.Fatal(err) + } } // TODO: -// - Anonymize requests -// - Spit out in format Brian can use -// - Profile? lol -// - More tests +// - Anonymize requests +// - Spit out in format Brian can use +// - Profile? lol +// - More tests func fastlyLogProcessor(ctx context.Context, cmd *cli.Command) error { if cmd.NArg() < 1 { - return cli.Exit("Must pass at least one directory to process logs of", 1) + return cli.Exit("Must pass at least one directory to process logs of", 1) } month := cmd.Args().Get(0) logDir := cmd.String("logDir") outDir := cmd.String("outDir") - // Initialize the database + // Initialize the database err := db.InitDB(fmt.Sprintf(filepath.Join(outDir, "logs.db"))) - if err != nil { - log.Fatal(err) - } - defer db.DB.Close() + if err != nil { + log.Fatal(err) + } + defer db.DB.Close() - // Create logs table if not exists - err = db.CreateLogTable() - if err != nil { - log.Fatal(err) - } + // Create logs table if not exists + err = db.CreateLogTable() + if err != nil { + log.Fatal(err) + } walkLogDir := fmt.Sprintf(filepath.Join(logDir, month)) - // Create a WaitGroup - var wg sync.WaitGroup + // Create a WaitGroup + var wg sync.WaitGroup - // Define a semaphore channel to limit concurrency - maxGoroutines := 10 // Adjust based on your system capabilities - semaphore := make(chan struct{}, maxGoroutines) + // Define a semaphore channel to limit concurrency + maxGoroutines := 10 // Adjust based on your system capabilities + semaphore := make(chan struct{}, maxGoroutines) - // Walk through the directory and process each log file concurrently - err = filepath.Walk(walkLogDir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.IsDir() && filepath.Ext(path) == ".log" { - wg.Add(1) - // Acquire a token - semaphore <- struct{}{} - go func(p string) { - defer wg.Done() - defer func() { <-semaphore }() // Release the token - err := processLogFile(p) - if err != nil { - log.Printf("Error processing file %s: %v\n", p, err) - } - }(path) - } - return nil - }) - if err != nil { - return err - } + // Walk through the directory and process each log file concurrently + err = filepath.Walk(walkLogDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() && filepath.Ext(path) == ".log" { + wg.Add(1) + // Acquire a token + semaphore <- struct{}{} + go func(p string) { + defer wg.Done() + defer func() { <-semaphore }() // Release the token + err := processLogFile(p) + if err != nil { + log.Printf("Error processing file %s: %v\n", p, err) + } + }(path) + } + return nil + }) + if err != nil { + return err + } - // Wait for all Goroutines to finish - wg.Wait() + // Wait for all Goroutines to finish + wg.Wait() return nil } func processLogFile(filePath string) error { - file, err := os.Open(filePath) - if err != nil { - return err - } - defer file.Close() + file, err := os.Open(filePath) + if err != nil { + return err + } + defer file.Close() - // Create channels - linesChan := make(chan string, 100) - entriesChan := make(chan *models.LogEntry, 100) - errorsChan := make(chan error, 100) + // Create channels + linesChan := make(chan string, 1000) + entriesChan := make(chan *models.LogEntry, 1000) + errorsChan := make(chan error, 1000) - var wg sync.WaitGroup + var wgParsers sync.WaitGroup - // Start Goroutines to parse lines - numParsers := 5 // Adjust as needed - for i := 0; i < numParsers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for line := range linesChan { - entry, err := parser.ParseLogLine(line) - if err != nil { - errorsChan <- fmt.Errorf("Error parsing line: %v", err) - continue - } - entriesChan <- entry - } - }() - } + // Start Goroutines to parse lines + numParsers := 100 // Adjust as needed + for i := 0; i < numParsers; i++ { + wgParsers.Add(1) + go func() { + defer wgParsers.Done() + for line := range linesChan { + log.Println("parsing line") + entry, err := parser.ParseLogLine(line) + if err != nil { + errorsChan <- fmt.Errorf("Error parsing line: %v", err) + continue + } + entriesChan <- entry + } + }() + } // Start a Goroutine to save entries to the database - wg.Add(1) + var wgSaver sync.WaitGroup + wgSaver.Add(1) go func() { - defer wg.Done() + defer wgSaver.Done() + const batchSize = 1000 // Adjust the batch size as needed + var batch []*models.LogEntry + batch = make([]*models.LogEntry, 0, batchSize) + for entry := range entriesChan { - err := entry.Save() - if err != nil { - errorsChan <- fmt.Errorf("Error saving entry: %v", err) + batch = append(batch, entry) + if len(batch) >= batchSize { + if err := saveBatch(batch); err != nil { + errorsChan <- fmt.Errorf("Error saving batch: %v", err) + } + batch = batch[:0] // Reset the batch + } + } + + // Save any remaining entries in the batch + if len(batch) > 0 { + if err := saveBatch(batch); err != nil { + errorsChan <- fmt.Errorf("Error saving batch: %v", err) } } }() - // Read lines from the file and send to linesChan - scanner := bufio.NewScanner(file) - for scanner.Scan() { - linesChan <- scanner.Text() - } - close(linesChan) + // Error handling + var errorList []error + var errorMutex sync.Mutex + var wgErrors sync.WaitGroup - // Wait for parsing and saving to finish - wg.Wait() - close(entriesChan) - close(errorsChan) - - if len(errorsChan) > 0 { - log.Printf("Encountered errors while processing file %s:\n", filePath) + wgErrors.Add(1) + go func() { + defer wgErrors.Done() for err := range errorsChan { + log.Println("handling error") + errorMutex.Lock() + errorList = append(errorList, err) + errorMutex.Unlock() + } + }() + + // Read lines from the file and send to linesChan + scanner := bufio.NewScanner(file) + for scanner.Scan() { + linesChan <- scanner.Text() + } + close(linesChan) // Close linesChan after all lines have been sent + + // Wait for parsing Goroutines to finish + wgParsers.Wait() + + // Close entriesChan since no more entries will be sent + close(entriesChan) + + // Wait for the saving Goroutine to finish + wgSaver.Wait() + + // Close errorsChan after all senders have finished + close(errorsChan) + + // Wait for the error handling Goroutine to finish + wgErrors.Wait() + + // Log the errors + if len(errorList) > 0 { + log.Printf("Encountered errors while processing file %s:\n", filePath) + for _, err := range errorList { log.Println(err) } } - if err := scanner.Err(); err != nil { + if err := scanner.Err(); err != nil { + return err + } + return nil +} + +// saveBatch saves a batch of log entries to the database using a transaction +func saveBatch(entries []*models.LogEntry) error { + tx, err := db.DB.Begin() + if err != nil { + return err + } + + stmt, err := tx.Prepare(` + INSERT INTO logs( + priority, + timestamp, + cache_server, + service_id, + client_ip, + request_method, + request_time, + request_url, + protocol, + response_status, + response_body_bytes, + host, + user_agent, + datacenter, + geo_city, + geo_continent_code, + geo_region, + start_time, + elapsed_time_usec, + is_hit, + cache_result + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `) + if err != nil { + tx.Rollback() + return err + } + defer stmt.Close() + + for _, entry := range entries { + _, err := stmt.Exec( + entry.Priority, + entry.Timestamp, + entry.CacheServer, + entry.ServiceID, + entry.ClientIP, + entry.RequestMethod, + entry.RequestTime, + entry.RequestURL, + entry.Protocol, + entry.ResponseStatus, + entry.ResponseBodyBytes, + entry.Host, + entry.UserAgent, + entry.Datacenter, + entry.GeoCity, + entry.GeoContinentCode, + entry.GeoRegion, + entry.StartTime, + entry.ElapsedTimeUsec, + entry.IsHit, + entry.CacheResult, + ) + if err != nil { + tx.Rollback() + return err + } + } + + if err := tx.Commit(); err != nil { + tx.Rollback() return err } return nil diff --git a/models/log_entry.go b/models/log_entry.go index ffccab3..55e3d0e 100644 --- a/models/log_entry.go +++ b/models/log_entry.go @@ -37,6 +37,7 @@ func (entry *LogEntry) Save() error { service_id, client_ip, request_method, + request_time, request_url, protocol, response_status, @@ -51,7 +52,7 @@ func (entry *LogEntry) Save() error { elapsed_time_usec, is_hit, cache_result - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` statement, err := db.DB.Prepare(insertSQL) if err != nil { @@ -64,6 +65,7 @@ func (entry *LogEntry) Save() error { entry.ServiceID, entry.ClientIP, entry.RequestMethod, + entry.RequestTime, entry.RequestURL, entry.Protocol, entry.ResponseStatus,