This commit is contained in:
Neil Hanlon 2025-01-04 15:47:32 -05:00
commit 649b754be6
Signed by: neil
GPG key ID: 705BC21EC3C70F34
10 changed files with 497 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
fastly_log_processor_errors.log
logs.db
logs.db-journal

53
db/database.go Normal file
View file

@ -0,0 +1,53 @@
package db
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
)
var DB *sql.DB
func InitDB(filepath string) error {
var err error
DB, err = sql.Open("sqlite3", filepath)
if err != nil {
return err
}
if err = DB.Ping(); err != nil {
return err
}
return nil
}
func CreateLogTable() error {
createTableSQL := `CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
priority INTEGER,
timestamp DATETIME,
cache_server TEXT,
service_id TEXT,
client_ip TEXT,
request_method TEXT,
request_url TEXT,
protocol TEXT,
response_status INTEGER,
response_body_bytes INTEGER,
host TEXT,
user_agent TEXT,
datacenter TEXT,
geo_city TEXT,
geo_continent_code TEXT,
geo_region TEXT,
start_time DATETIME,
elapsed_time_usec INTEGER,
is_hit BOOLEAN,
cache_result TEXT
);`
statement, err := DB.Prepare(createTableSQL)
if err != nil {
return err
}
_, err = statement.Exec()
return err
}

8
go.mod Normal file
View file

@ -0,0 +1,8 @@
module git.resf.org/infrastructure/process-fastly-logs
go 1.23.4
require (
github.com/mattn/go-sqlite3 v1.14.24
github.com/urfave/cli/v3 v3.0.0-beta1
)

4
go.sum Normal file
View file

@ -0,0 +1,4 @@
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/urfave/cli/v3 v3.0.0-beta1 h1:6DTaaUarcM0wX7qj5Hcvs+5Dm3dyUTBbEwIWAjcw9Zg=
github.com/urfave/cli/v3 v3.0.0-beta1/go.mod h1:FnIeEMYu+ko8zP1F9Ypr3xkZMIDqW3DR92yUtY39q1Y=

189
main.go Normal file
View file

@ -0,0 +1,189 @@
package main
import (
"bufio"
"fmt"
"log"
"os"
"path/filepath"
"sync"
"io"
"context"
"github.com/urfave/cli/v3"
"git.resf.org/infrastructure/process-fastly-logs/db"
"git.resf.org/infrastructure/process-fastly-logs/parser"
"git.resf.org/infrastructure/process-fastly-logs/models"
)
func init() {
logFile, err := os.OpenFile("process_fastly_logs_errors.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Println("Failed to open error log file:", err)
os.Exit(1)
}
log.SetOutput(io.MultiWriter(os.Stdout, logFile))
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
}
func main() {
cwd, err := os.Getwd()
if err != nil {
log.Println(err)
}
cmd := &cli.Command{
Flags: []cli.Flag{
&cli.StringFlag{
Name: "logDir",
Value: cwd,
Usage: "directory containing logs to process",
},
&cli.StringFlag{
Name: "outDir",
Value: cwd,
Usage: "directory to ouput database and program logs",
},
},
Action: fastlyLogProcessor,
}
if err := cmd.Run(context.Background(), os.Args); err != nil {
log.Fatal(err)
}
}
// TODO:
// - Anonymize requests
// - Spit out in format Brian can use
// - Profile? lol
// - More tests
func fastlyLogProcessor(ctx context.Context, cmd *cli.Command) error {
if cmd.NArg() < 1 {
return cli.Exit("Must pass at least one directory to process logs of", 1)
}
month := cmd.Args().Get(0)
logDir := cmd.String("logDir")
outDir := cmd.String("outDir")
// Initialize the database
err := db.InitDB(fmt.Sprintf(filepath.Join(outDir, "logs.db")))
if err != nil {
log.Fatal(err)
}
defer db.DB.Close()
// Create logs table if not exists
err = db.CreateLogTable()
if err != nil {
log.Fatal(err)
}
walkLogDir := fmt.Sprintf(filepath.Join(logDir, month))
// Create a WaitGroup
var wg sync.WaitGroup
// Define a semaphore channel to limit concurrency
maxGoroutines := 10 // Adjust based on your system capabilities
semaphore := make(chan struct{}, maxGoroutines)
// Walk through the directory and process each log file concurrently
err = filepath.Walk(walkLogDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && filepath.Ext(path) == ".log" {
wg.Add(1)
// Acquire a token
semaphore <- struct{}{}
go func(p string) {
defer wg.Done()
defer func() { <-semaphore }() // Release the token
err := processLogFile(p)
if err != nil {
log.Printf("Error processing file %s: %v\n", p, err)
}
}(path)
}
return nil
})
if err != nil {
return err
}
// Wait for all Goroutines to finish
wg.Wait()
return nil
}
func processLogFile(filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// Create channels
linesChan := make(chan string, 100)
entriesChan := make(chan *models.LogEntry, 100)
errorsChan := make(chan error, 100)
var wg sync.WaitGroup
// Start Goroutines to parse lines
numParsers := 5 // Adjust as needed
for i := 0; i < numParsers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for line := range linesChan {
entry, err := parser.ParseLogLine(line)
if err != nil {
errorsChan <- fmt.Errorf("Error parsing line: %v", err)
continue
}
entriesChan <- entry
}
}()
}
// Start a Goroutine to save entries to the database
wg.Add(1)
go func() {
defer wg.Done()
for entry := range entriesChan {
err := entry.Save()
if err != nil {
errorsChan <- fmt.Errorf("Error saving entry: %v", err)
}
}
}()
// Read lines from the file and send to linesChan
scanner := bufio.NewScanner(file)
for scanner.Scan() {
linesChan <- scanner.Text()
}
close(linesChan)
// Wait for parsing and saving to finish
wg.Wait()
close(entriesChan)
close(errorsChan)
if len(errorsChan) > 0 {
log.Printf("Encountered errors while processing file %s:\n", filePath)
for err := range errorsChan {
log.Println(err)
}
}
if err := scanner.Err(); err != nil {
return err
}
return nil
}

83
models/log_entry.go Normal file
View file

@ -0,0 +1,83 @@
package models
import (
"time"
"git.resf.org/infrastructure/process-fastly-logs/db"
)
type LogEntry struct {
Priority int
Timestamp time.Time
RequestTime time.Time
CacheServer string
ServiceID string
ClientIP string
RequestMethod string
RequestURL string
Protocol string
ResponseStatus int
ResponseBodyBytes int
Host string
UserAgent string
Datacenter string
GeoCity string
GeoContinentCode string
GeoRegion string
StartTime time.Time
ElapsedTimeUsec int
IsHit bool
CacheResult string
}
func (entry *LogEntry) Save() error {
insertSQL := `INSERT INTO logs(
priority,
timestamp,
cache_server,
service_id,
client_ip,
request_method,
request_url,
protocol,
response_status,
response_body_bytes,
host,
user_agent,
datacenter,
geo_city,
geo_continent_code,
geo_region,
start_time,
elapsed_time_usec,
is_hit,
cache_result
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
statement, err := db.DB.Prepare(insertSQL)
if err != nil {
return err
}
_, err = statement.Exec(
entry.Priority,
entry.Timestamp,
entry.CacheServer,
entry.ServiceID,
entry.ClientIP,
entry.RequestMethod,
entry.RequestURL,
entry.Protocol,
entry.ResponseStatus,
entry.ResponseBodyBytes,
entry.Host,
entry.UserAgent,
entry.Datacenter,
entry.GeoCity,
entry.GeoContinentCode,
entry.GeoRegion,
entry.StartTime,
entry.ElapsedTimeUsec,
entry.IsHit,
entry.CacheResult,
)
return err
}

79
parser/parser.go Normal file
View file

@ -0,0 +1,79 @@
package parser
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"
"git.resf.org/infrastructure/process-fastly-logs/models"
)
func ParseLogLine(line string) (*models.LogEntry, error) {
// Regular expression to match the log line components
pattern := `<(?P<priority>\d+)>(?P<timestamp>[^\s]+)\s+(?P<cacheserver>[^\s]+)\s+(?P<serviceid>[^\[]+)\[(?P<pid>\d+)\]:\s+(?P<clientip>[^\s]+)\s+"-" "-" \[(?P<requesttime>[^\]]+)\]\s+"(?P<request>[^"]+)"\s+(?P<status>\d+)\s+(?P<bytes>\d+)\s+(?P<host>[^\s]+)\s+(?P<useragent>[^"]+)\s+(?P<datacenter>[^\s]+)\s+(?P<geocity>[^\s]+)\s+(?P<geocontinent>[^\s]+)\s+(?P<georegion>[^\s]+)\s+(?P<starttimesec>\d+)\s+(?P<elapsedtime>\d+)\s+(?P<ishit>\w+)\s+(?P<cacheresult>\w+)`
regex := regexp.MustCompile(pattern)
match := regex.FindStringSubmatch(line)
if match == nil {
return nil, fmt.Errorf("line does not match expected format")
}
result := make(map[string]string)
for i, name := range regex.SubexpNames() {
if i != 0 && name != "" {
result[name] = match[i]
}
}
// Now parse each field into the appropriate type
priority, _ := strconv.Atoi(result["priority"])
timestamp, _ := time.Parse(time.RFC3339, result["timestamp"])
requestTime, _ := time.Parse("02/Jan/2006:15:04:05 -0700", result["requesttime"])
status, _ := strconv.Atoi(result["status"])
bytes, _ := strconv.Atoi(result["bytes"])
elapsedTimeUsec, _ := strconv.Atoi(result["elapsedtime"])
isHit := result["ishit"] == "true"
// Split the request into method, URL, and protocol
requestParts := strings.SplitN(result["request"], " ", 3)
if len(requestParts) != 3 {
return nil, fmt.Errorf("invalid request format")
}
requestMethod := requestParts[0]
requestURL := requestParts[1]
protocol := requestParts[2]
// Build the LogEntry struct
entry := &models.LogEntry{
Priority: priority,
Timestamp: timestamp,
RequestTime: requestTime,
CacheServer: result["cacheserver"],
ServiceID: strings.TrimSpace(result["serviceid"]),
ClientIP: result["clientip"],
RequestMethod: requestMethod,
RequestURL: requestURL,
Protocol: protocol,
ResponseStatus: status,
ResponseBodyBytes: bytes,
Host: result["host"],
UserAgent: result["useragent"],
Datacenter: result["datacenter"],
GeoCity: result["geocity"],
GeoContinentCode: result["geocontinent"],
GeoRegion: result["georegion"],
StartTime: time.Unix(int64(parseInt(result["starttimesec"])), 0),
ElapsedTimeUsec: elapsedTimeUsec,
IsHit: isHit,
CacheResult: result["cacheresult"],
}
return entry, nil
}
func parseInt(s string) int {
i, _ := strconv.Atoi(s)
return i
}

71
parser/parser_test.go Normal file
View file

@ -0,0 +1,71 @@
package parser_test
import (
"bufio"
"os"
"path/filepath"
"testing"
"git.resf.org/infrastructure/process-fastly-logs/parser"
)
func TestParseValidLogLines(t *testing.T) {
filePath := filepath.Join("../", "testdata", "valid_logs.log")
file, err := os.Open(filePath)
if err != nil {
t.Fatalf("Failed to open test data file: %v", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineNumber := 0
for scanner.Scan() {
line := scanner.Text()
lineNumber++
entry, err := parser.ParseLogLine(line)
if err != nil {
t.Errorf("Failed to parse valid log line at line %d: %v", lineNumber, err)
continue
}
// Optionally, add assertions to check specific fields
if entry == nil {
t.Errorf("Parsed entry is nil at line %d", lineNumber)
} else {
// Example assertion: check if the ResponseStatus is as expected
// For more comprehensive tests, consider checking all fields
expectedStatus := 200
if entry.ResponseStatus != expectedStatus {
t.Errorf("Unexpected ResponseStatus at line %d: got %d, want %d", lineNumber, entry.ResponseStatus, expectedStatus)
}
}
}
if err := scanner.Err(); err != nil {
t.Fatalf("Error reading test data file: %v", err)
}
}
func TestParseInvalidLogLines(t *testing.T) {
filePath := filepath.Join("../", "testdata", "invalid_logs.log")
file, err := os.Open(filePath)
if err != nil {
t.Fatalf("Failed to open test data file: %v", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineNumber := 0
for scanner.Scan() {
line := scanner.Text()
lineNumber++
entry, err := parser.ParseLogLine(line)
if err == nil {
t.Errorf("Expected parsing error at line %d but got none", lineNumber)
}
if entry != nil {
t.Errorf("Expected no entry at line %d but got one", lineNumber)
}
}
if err := scanner.Err(); err != nil {
t.Fatalf("Error reading test data file: %v", err)
}
}

2
testdata/invalid_logs.log vendored Normal file
View file

@ -0,0 +1,2 @@
This is a malformed log line that doesn't match the expected format.
<134>Malformed log line with missing fields

5
testdata/valid_logs.log vendored Normal file
View file

@ -0,0 +1,5 @@
<134>2023-12-31T23:07:30Z cache-bos4678 s3-resf-fastly-logs[462441]: 255.255.255.255 "-" "-" [31/Dec/2023:23:07:29 +0000] "GET /mirrorlist?arch=x86_64&repo=AppStream-9 HTTP/2" 200 3015 mirrors.rockylinux.org libdnf (Rocky Linux 9.2; generic; Linux.x86_64) BOS stoughton NA MA 1704064050 36477 false PASS
<134>2023-12-31T23:07:30Z cache-chi-klot8100159 s3-resf-fastly-logs[462441]: 255.255.255.255 "-" "-" [31/Dec/2023:23:07:30 +0000] "GET /mirrorlist?arch=x86_64&repo=extras-9 HTTP/1.1" 200 2183 mirrors.rockylinux.org libdnf (Rocky Linux 9.2; generic; Linux.x86_64) CHI paris EU IDF 1704064050 11710 false PASS
<134>2023-12-31T23:07:30Z cache-chi-klot8100078 s3-resf-fastly-logs[462441]: 255.255.255.255 "-" "-" [31/Dec/2023:23:07:30 +0000] "GET /mirrorlist?arch=x86_64&repo=AppStream-8 HTTP/1.1" 200 3015 mirrors.rockylinux.org libdnf (Rocky Linux 8.5; generic; Linux.x86_64) CHI el segundo NA CA 1704064050 12411 false PASS
<134>2023-12-31T23:07:30Z cache-fra-etou8220042 s3-resf-fastly-logs[462441]: dead:beef:ffff:ffff::ffff "-" "-" [31/Dec/2023:23:07:30 +0000] "GET /mirrorlist?arch=x86_64&repo=extras-8 HTTP/1.1" 200 4727 mirrors.rockylinux.org libdnf (Rocky Linux 8.7; generic; Linux.x86_64) FRA dusseldorf EU NW 1704064050 116266 false PASS
<134>2023-12-31T23:07:30Z cache-fra-etou8220047 s3-resf-fastly-logs[462441]: 255.255.255.255 "-" "-" [31/Dec/2023:23:07:30 +0000] "GET /mirrorlist?arch=x86_64&repo=BaseOS-9 HTTP/2" 200 10365 mirrors.rockylinux.org libdnf (Rocky Linux 9.1; generic; Linux.x86_64) FRA nuremberg EU BY 1704064050 120257 false PASS