From 18b15013ebf7af0c01fb1ddc0adae8ed9336f521 Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Mon, 28 Aug 2023 16:57:40 +0200 Subject: [PATCH] ProcessRPMWorkflow first full prototype --- base/go/storage/detector/BUILD | 1 - .../cmd/mship_worker_server/main.go | 1 + tools/mothership/proto/v1/process_rpm.proto | 30 ++++++++++++++ tools/mothership/worker_server/BUILD | 2 + tools/mothership/worker_server/entry.go | 40 +++++++++++++++++++ tools/mothership/worker_server/process_rpm.go | 4 ++ tools/mothership/worker_server/workflows.go | 22 +++++++--- 7 files changed, 94 insertions(+), 6 deletions(-) create mode 100644 tools/mothership/worker_server/entry.go diff --git a/base/go/storage/detector/BUILD b/base/go/storage/detector/BUILD index 28959af2..4a431603 100644 --- a/base/go/storage/detector/BUILD +++ b/base/go/storage/detector/BUILD @@ -20,7 +20,6 @@ go_library( importpath = "go.resf.org/peridot/base/go/storage/detector", visibility = ["//visibility:public"], deps = [ - "//base/go", "//base/go/storage", "//base/go/storage/memory", "//base/go/storage/s3", diff --git a/tools/mothership/cmd/mship_worker_server/main.go b/tools/mothership/cmd/mship_worker_server/main.go index 59e91474..6a305d82 100644 --- a/tools/mothership/cmd/mship_worker_server/main.go +++ b/tools/mothership/cmd/mship_worker_server/main.go @@ -102,6 +102,7 @@ func run(ctx *cli.Context) error { } func main() { + base.ChangeDefaultDatabaseURL("mothership") base.ChangeDefaultForEnvVar(base.EnvVarTemporalTaskQueue, "mship_worker_server") flags := base.WithDefaultCliFlagsTemporal(base.WithStorageFlags()...) diff --git a/tools/mothership/proto/v1/process_rpm.proto b/tools/mothership/proto/v1/process_rpm.proto index 37a33f58..742c06f4 100644 --- a/tools/mothership/proto/v1/process_rpm.proto +++ b/tools/mothership/proto/v1/process_rpm.proto @@ -40,6 +40,29 @@ message ProcessRPMRequest { // Self reported checksum of the RPM // Must be a SHA256 checksum and match the RPM string checksum = 3 [(google.api.field_behavior) = REQUIRED]; + + // Self reported repository of the RPM + // e.g. BaseOS + string repository = 4 [(google.api.field_behavior) = REQUIRED]; + + // Batch to associate the RPM with + string batch = 5; +} + +// ProcessRPMInternalRequest is the request message that the Server +// uses in its call to the ProcessRPM workflow +message ProcessRPMInternalRequest { + // Worker ID of the worker processing the RPM + string worker_id = 1 [(google.api.field_behavior) = REQUIRED]; +} + +// ProcessRPMArgs is the arguments for the ProcessRPM workflow +message ProcessRPMArgs { + // Public request + ProcessRPMRequest request = 1 [(google.api.field_behavior) = REQUIRED]; + + // Internal request + ProcessRPMInternalRequest internal_request = 2 [(google.api.field_behavior) = REQUIRED]; } // ProcessRPMMetadata is the metadata for the ProcessRPM workflow @@ -62,4 +85,11 @@ message ImportRPMResponse { // Commit hash of the imported RPM // e.g. 1234567890abcdef1234567890abcdef12345678 string commit_hash = 1 [(google.api.field_behavior) = REQUIRED]; + + // Commit URI of the imported RPM + string commit_uri = 2 [(google.api.field_behavior) = REQUIRED]; + + // NEVRA of the imported RPM + // e.g. rpm-1.0.0-1.el8.x86_64 + string nevra = 3 [(google.api.field_behavior) = REQUIRED]; } diff --git a/tools/mothership/worker_server/BUILD b/tools/mothership/worker_server/BUILD index d86cf63d..42ac310f 100644 --- a/tools/mothership/worker_server/BUILD +++ b/tools/mothership/worker_server/BUILD @@ -17,6 +17,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "worker_server", srcs = [ + "entry.go", "process_rpm.go", "worker.go", "workflows.go", @@ -26,6 +27,7 @@ go_library( deps = [ "//base/go", "//base/go/storage", + "//tools/mothership/db", "//tools/mothership/proto/v1:pb", "//tools/mothership/worker_server/forge", "//tools/mothership/worker_server/srpm_import", diff --git a/tools/mothership/worker_server/entry.go b/tools/mothership/worker_server/entry.go new file mode 100644 index 00000000..fff0e1fc --- /dev/null +++ b/tools/mothership/worker_server/entry.go @@ -0,0 +1,40 @@ +package mothership_worker_server + +import ( + "database/sql" + "github.com/pkg/errors" + base "go.resf.org/peridot/base/go" + mothership_db "go.resf.org/peridot/tools/mothership/db" + mothershippb "go.resf.org/peridot/tools/mothership/pb" +) + +func (w *Worker) CreateEntry(args *mothershippb.ProcessRPMArgs, importRpmRes *mothershippb.ImportRPMResponse) (*mothershippb.Entry, error) { + req := args.Request + internalReq := args.InternalRequest + entry := mothership_db.Entry{ + Name: base.NameGen("entries"), + EntryID: importRpmRes.Nevra, + OSRelease: req.OsRelease, + Sha256Sum: req.Checksum, + RepositoryName: req.Repository, + WorkerID: sql.NullString{ + String: internalReq.WorkerId, + Valid: true, + }, + CommitURI: importRpmRes.CommitUri, + CommitHash: importRpmRes.CommitHash, + } + if req.Batch != "" { + entry.BatchName = sql.NullString{ + String: req.Batch, + Valid: true, + } + } + + err := base.Q[mothership_db.Entry](w.db).Create(&entry) + if err != nil { + return nil, errors.Wrap(err, "failed to create entry") + } + + return entry.ToPB(), nil +} diff --git a/tools/mothership/worker_server/process_rpm.go b/tools/mothership/worker_server/process_rpm.go index 915ef3bf..fb47d1a2 100644 --- a/tools/mothership/worker_server/process_rpm.go +++ b/tools/mothership/worker_server/process_rpm.go @@ -176,7 +176,11 @@ func (w *Worker) ImportRPM(uri string, checksumSha256 string) (*mothershippb.Imp return nil, errors.Wrap(err, "failed to import SRPM") } + commitURI := w.forge.GetCommitViewerURL(repoName, commit.Hash.String()) + return &mothershippb.ImportRPMResponse{ CommitHash: commit.Hash.String(), + CommitUri: commitURI, + Nevra: nevra.String(), }, nil } diff --git a/tools/mothership/worker_server/workflows.go b/tools/mothership/worker_server/workflows.go index cb53cbcc..fcf8d2bf 100644 --- a/tools/mothership/worker_server/workflows.go +++ b/tools/mothership/worker_server/workflows.go @@ -15,7 +15,6 @@ package mothership_worker_server import ( - "errors" mothershippb "go.resf.org/peridot/tools/mothership/pb" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" @@ -28,7 +27,7 @@ var w Worker // Usually a client worker will first initiate an upload to the storage backend, // then send a request to the Server `SubmitEntry` method (or send a request // then upload the resource). -func ProcessRPMWorkflow(ctx workflow.Context, req *mothershippb.ProcessRPMRequest) (*mothershippb.ProcessRPMResponse, error) { +func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs) (*mothershippb.ProcessRPMResponse, error) { // First verify that the resource exists. // The resource can be uploaded after the request is sent. // So we should wait up to 2 hours. The initial timeouts should be low @@ -43,7 +42,7 @@ func ProcessRPMWorkflow(ctx workflow.Context, req *mothershippb.ProcessRPMReques MaximumAttempts: (60 * 60 * 2) / 25, }, }) - err := workflow.ExecuteActivity(ctx, w.VerifyResourceExists, req.RpmUri).Get(ctx, nil) + err := workflow.ExecuteActivity(ctx, w.VerifyResourceExists, args.Request.RpmUri).Get(ctx, nil) if err != nil { return nil, err } @@ -57,10 +56,23 @@ func ProcessRPMWorkflow(ctx workflow.Context, req *mothershippb.ProcessRPMReques MaximumAttempts: 0, }, }) - err = workflow.ExecuteActivity(ctx, w.ImportRPM, req.RpmUri, req.Checksum).Get(ctx, nil) + var importRpmRes mothershippb.ImportRPMResponse + err = workflow.ExecuteActivity(ctx, w.ImportRPM, args.Request.RpmUri, args.Request.Checksum).Get(ctx, &importRpmRes) if err != nil { return nil, err } - return nil, errors.New("unimplemented") + // Now the import has reached the Git forge. Let's create an entry. + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 25 * time.Second, + }) + var entry mothershippb.Entry + err = workflow.ExecuteActivity(ctx, w.CreateEntry, args, &importRpmRes).Get(ctx, &entry) + if err != nil { + return nil, err + } + + return &mothershippb.ProcessRPMResponse{ + Entry: &entry, + }, nil }