process-fastly-logs/main.go

357 lines
8.2 KiB
Go

package main
import (
"bufio"
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sync"
"time"
"github.com/urfave/cli/v3"
"git.resf.org/infrastructure/process-fastly-logs/db"
"git.resf.org/infrastructure/process-fastly-logs/models"
"git.resf.org/infrastructure/process-fastly-logs/parser"
)
func init() {
logFile, err := os.OpenFile("process_fastly_logs_errors.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Println("Failed to open error log file:", err)
os.Exit(1)
}
log.SetOutput(io.MultiWriter(os.Stdout, logFile))
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
}
func main() {
cwd, err := os.Getwd()
if err != nil {
log.Println(err)
}
cmd := &cli.Command{
Flags: []cli.Flag{
&cli.StringFlag{
Name: "logDir",
Value: cwd,
Usage: "directory containing logs to process",
},
&cli.StringFlag{
Name: "outDir",
Value: cwd,
Usage: "directory to ouput database and program logs",
},
},
Action: fastlyLogProcessor,
}
if err := cmd.Run(context.Background(), os.Args); err != nil {
log.Fatal(err)
}
}
// TODO:
// - Anonymize requests
// - Spit out in format Brian can use
// - Profile? lol
// - More tests
func fastlyLogProcessor(ctx context.Context, cmd *cli.Command) error {
if cmd.NArg() < 1 {
return cli.Exit("Must pass at least one directory to process logs of", 1)
}
month := cmd.Args().Get(0)
logDir := cmd.String("logDir")
outDir := cmd.String("outDir")
// Initialize the database
err := db.InitDB(fmt.Sprintf(filepath.Join(outDir, "logs.db")))
if err != nil {
log.Fatal(err)
}
defer db.DB.Close()
// Create logs table if not exists
err = db.CreateLogTable()
if err != nil {
log.Fatal(err)
}
walkLogDir := fmt.Sprintf(filepath.Join(logDir, month))
// Create a WaitGroup
var wg sync.WaitGroup
// Define a semaphore channel to limit concurrency
maxGoroutines := 10 // Adjust based on your system capabilities
semaphore := make(chan struct{}, maxGoroutines)
// Walk through the directory and process each log file concurrently
err = filepath.Walk(walkLogDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && filepath.Ext(path) == ".log" {
wg.Add(1)
// Acquire a token
semaphore <- struct{}{}
go func(p string) {
defer wg.Done()
defer func() { <-semaphore }() // Release the token
err := processLogFile(p)
if err != nil {
log.Printf("Error processing file %s: %v\n", p, err)
}
}(path)
}
return nil
})
if err != nil {
return err
}
// Wait for all Goroutines to finish
wg.Wait()
return nil
}
func processLogFile(filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// Create channels
linesChan := make(chan string, 1000)
entriesChan := make(chan *models.LogEntry, 1000)
errorsChan := make(chan error, 1000)
var wgParsers sync.WaitGroup
// Start Goroutines to parse lines
numParsers := 100 // Adjust as needed
for i := 0; i < numParsers; i++ {
wgParsers.Add(1)
go func() {
defer wgParsers.Done()
for line := range linesChan {
entry, err := parser.ParseLogLine(line)
if err != nil {
errorsChan <- fmt.Errorf("Error parsing line: `%v` => %v", line, err)
continue
}
entriesChan <- entry
}
}()
}
// Start a Goroutine to save entries to the database
var wgSaver sync.WaitGroup
wgSaver.Add(1)
go func() {
defer wgSaver.Done()
const batchSize = 1000
var batch []*models.LogEntry
batch = make([]*models.LogEntry, 0, batchSize)
for entry := range entriesChan {
batch = append(batch, entry)
if len(batch) >= batchSize {
if err := saveBatch(batch); err != nil {
errorsChan <- fmt.Errorf("Error saving batch: %v", err)
}
batch = batch[:0]
}
}
// Save any remaining entries in the batch
if len(batch) > 0 {
if err := saveBatch(batch); err != nil {
errorsChan <- fmt.Errorf("Error saving batch: %v", err)
}
}
}()
// Error handling
var errorList []error
var errorMutex sync.Mutex
var wgErrors sync.WaitGroup
wgErrors.Add(1)
go func() {
defer wgErrors.Done()
for err := range errorsChan {
errorMutex.Lock()
errorList = append(errorList, err)
errorMutex.Unlock()
}
}()
// Read lines from the file and send to linesChan
scanner := bufio.NewScanner(file)
for scanner.Scan() {
linesChan <- scanner.Text()
}
close(linesChan) // Close linesChan after all lines have been sent
// Wait for parsing Goroutines to finish
wgParsers.Wait()
// Close entriesChan since no more entries will be sent
close(entriesChan)
// Wait for the saving Goroutine to finish
wgSaver.Wait()
// Close errorsChan after all senders have finished
close(errorsChan)
// Wait for the error handling Goroutine to finish
wgErrors.Wait()
// Log the errors
if len(errorList) > 0 {
log.Printf("Encountered errors while processing file %s:\n", filePath)
for _, err := range errorList {
log.Println(err)
}
}
if err := scanner.Err(); err != nil {
return err
}
return nil
}
func saveBatch(entries []*models.LogEntry) error {
const (
maxRetries = 15
initialBackoff = 500 * time.Millisecond
maxBackoff = 5 * time.Second
backoffFactor = 2
)
var attempt int
var backoff = initialBackoff
var err error
for attempt = 1; attempt <= maxRetries; attempt++ {
err = saveBatchOnce(entries)
if err == nil {
// Success
return nil
}
// Log the error
log.Printf("Attempt %d: Error saving batch: %v", attempt, err)
// Check if the error is retryable
if !isRetryableError(err) {
// If it's not a retryable error, return immediately
return err
}
// Wait before retrying
time.Sleep(backoff)
// Increase the backoff time for the next attempt, up to the maximum
if backoff < maxBackoff {
backoff = time.Duration(float64(backoff) * backoffFactor)
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
// After max retries, return the last error
return fmt.Errorf("failed to save batch after %d attempts: last error: %w", maxRetries, err)
}
func saveBatchOnce(entries []*models.LogEntry) error {
tx, err := db.DB.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`
INSERT INTO logs(
priority,
timestamp,
cache_server,
service_id,
client_ip,
request_method,
request_time,
request_url,
protocol,
response_status,
response_body_bytes,
host,
user_agent,
datacenter,
geo_city,
geo_continent_code,
geo_region,
start_time,
elapsed_time_usec,
is_hit,
cache_result
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, entry := range entries {
_, err := stmt.Exec(
entry.Priority,
entry.Timestamp,
entry.CacheServer,
entry.ServiceID,
entry.ClientIP,
entry.RequestMethod,
entry.RequestTime,
entry.RequestURL,
entry.Protocol,
entry.ResponseStatus,
entry.ResponseBodyBytes,
entry.Host,
entry.UserAgent,
entry.Datacenter,
entry.GeoCity,
entry.GeoContinentCode,
entry.GeoRegion,
entry.StartTime,
entry.ElapsedTimeUsec,
entry.IsHit,
entry.CacheResult,
)
if err != nil {
tx.Rollback()
return err
}
}
if err := tx.Commit(); err != nil {
tx.Rollback()
return err
}
return nil
}
// isRetryableError determines if an error is retryable
func isRetryableError(err error) bool {
// Implement logic to determine if the error is transient
// For example, check for network errors, timeouts, deadlocks, etc.
// This is a placeholder implementation
return true // Assume all errors are retryable in this example
}