From cf859b9793f4661698f3f532307f27514560339c Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Sun, 27 Aug 2023 06:04:51 +0200 Subject: [PATCH] Add mship_worker_server (Temporal worker) --- tools/build_rules/ui_library/BUILD | 1 - .../mothership/cmd/mship_worker_server/BUILD | 22 +++++++ .../cmd/mship_worker_server/main.go | 50 +++++++++++++++ tools/mothership/proto/admin/BUILD | 1 - tools/mothership/proto/v1/process_rpm.proto | 58 +++++++++++++++++ tools/mothership/worker_server/BUILD | 20 ++++++ tools/mothership/worker_server/process_rpm.go | 62 +++++++++++++++++++ tools/mothership/worker_server/worker.go | 18 ++++++ tools/mothership/worker_server/workflows.go | 35 +++++++++++ 9 files changed, 265 insertions(+), 2 deletions(-) create mode 100644 tools/mothership/cmd/mship_worker_server/BUILD create mode 100644 tools/mothership/cmd/mship_worker_server/main.go create mode 100644 tools/mothership/proto/v1/process_rpm.proto create mode 100644 tools/mothership/worker_server/BUILD create mode 100644 tools/mothership/worker_server/process_rpm.go create mode 100644 tools/mothership/worker_server/worker.go create mode 100644 tools/mothership/worker_server/workflows.go diff --git a/tools/build_rules/ui_library/BUILD b/tools/build_rules/ui_library/BUILD index ae65b6eb..a17d19be 100644 --- a/tools/build_rules/ui_library/BUILD +++ b/tools/build_rules/ui_library/BUILD @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/tools/mothership/cmd/mship_worker_server/BUILD b/tools/mothership/cmd/mship_worker_server/BUILD new file mode 100644 index 00000000..217d0301 --- /dev/null +++ b/tools/mothership/cmd/mship_worker_server/BUILD @@ -0,0 +1,22 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "mship_worker_server_lib", + srcs = ["main.go"], + importpath = "go.resf.org/peridot/tools/mothership/cmd/mship_worker_server", + visibility = ["//visibility:private"], + deps = [ + "//base/go", + "//base/go/storage/detector", + "//tools/mothership/worker_server", + "//vendor/github.com/urfave/cli/v2:cli", + "//vendor/go.temporal.io/sdk/client", + "//vendor/go.temporal.io/sdk/worker", + ], +) + +go_binary( + name = "mship_worker_server", + embed = [":mship_worker_server_lib"], + visibility = ["//visibility:public"], +) diff --git a/tools/mothership/cmd/mship_worker_server/main.go b/tools/mothership/cmd/mship_worker_server/main.go new file mode 100644 index 00000000..c9639501 --- /dev/null +++ b/tools/mothership/cmd/mship_worker_server/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "github.com/urfave/cli/v2" + base "go.resf.org/peridot/base/go" + storage_detector "go.resf.org/peridot/base/go/storage/detector" + mothership_worker_server "go.resf.org/peridot/tools/mothership/worker_server" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "os" +) + +func run(ctx *cli.Context) error { + temporalClient, err := base.GetTemporalClientFromFlags(ctx, client.Options{}) + if err != nil { + return err + } + + db := base.GetDBFromFlags(ctx) + storage, err := storage_detector.FromFlags(ctx) + if err != nil { + return err + } + + w := worker.New(temporalClient, ctx.String(string(base.EnvVarTemporalTaskQueue)), worker.Options{}) + workerServer := mothership_worker_server.New(db, storage) + + // Register workflows + w.RegisterWorkflow(mothership_worker_server.ProcessRPMWorkflow) + + // Register activities + w.RegisterActivity(workerServer) + + // Start worker + return w.Run(worker.InterruptCh()) +} + +func main() { + base.ChangeDefaultForEnvVar(base.EnvVarTemporalTaskQueue, "mship_worker_server") + + app := &cli.App{ + Name: "mship_worker_server", + Action: run, + Flags: base.WithDefaultCliFlagsTemporal(base.WithStorageFlags()...), + } + + if err := app.Run(os.Args); err != nil { + base.LogFatalf("failed to run mship_worker_server: %v", err) + } +} diff --git a/tools/mothership/proto/admin/BUILD b/tools/mothership/proto/admin/BUILD index ae65b6eb..a17d19be 100644 --- a/tools/mothership/proto/admin/BUILD +++ b/tools/mothership/proto/admin/BUILD @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/tools/mothership/proto/v1/process_rpm.proto b/tools/mothership/proto/v1/process_rpm.proto new file mode 100644 index 00000000..143ab45d --- /dev/null +++ b/tools/mothership/proto/v1/process_rpm.proto @@ -0,0 +1,58 @@ +// Copyright 2023 Peridot Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package peridot.tools.mothership.v1; + +import "google/api/field_behavior.proto"; +import "google/protobuf/timestamp.proto"; +import "google/protobuf/wrappers.proto"; +import "tools/mothership/proto/v1/entry.proto"; + +option java_multiple_files = true; +option java_outer_classname = "ProcessRpmProto"; +option java_package = "org.resf.peridot.tools.mothership.v1"; +option go_package = "go.resf.org/peridot/tools/mothership/pb;mothershippb"; + +// ProcessRPMRequest is the request message for the ProcessRPM workflow +message ProcessRPMRequest { + // URI of the RPM to process + // e.g. gs://bucket/path/to/rpm.rpm + // The server must have read access to the RPM and WILL error if it does not + string rpm_uri = 1 [(google.api.field_behavior) = REQUIRED]; + + // OS Release of the RPM + // e.g. Red Hat Enterprise Linux release 8.8 (Ootpa) + string os_release = 2 [(google.api.field_behavior) = REQUIRED]; + + // Self reported checksum of the RPM + // Must be a SHA256 checksum and match the RPM + string checksum = 3 [(google.api.field_behavior) = REQUIRED]; +} + +// ProcessRPMMetadata is the metadata for the ProcessRPM workflow +message ProcessRPMMetadata { + // The time at which the workflow started + google.protobuf.Timestamp start_time = 1; + + // The time at which the workflow finished + google.protobuf.Timestamp end_time = 2; +} + +// ProcessRPMResponse is the response message for the ProcessRPM workflow +message ProcessRPMResponse { + // The entry created for the RPM + Entry entry = 1; +} diff --git a/tools/mothership/worker_server/BUILD b/tools/mothership/worker_server/BUILD new file mode 100644 index 00000000..f2c53536 --- /dev/null +++ b/tools/mothership/worker_server/BUILD @@ -0,0 +1,20 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "worker_server", + srcs = [ + "process_rpm.go", + "worker.go", + "workflows.go", + ], + importpath = "go.resf.org/peridot/tools/mothership/worker_server", + visibility = ["//visibility:public"], + deps = [ + "//base/go", + "//base/go/storage", + "//tools/mothership/proto/v1:pb", + "//vendor/github.com/pkg/errors", + "//vendor/go.temporal.io/sdk/temporal", + "//vendor/go.temporal.io/sdk/workflow", + ], +) diff --git a/tools/mothership/worker_server/process_rpm.go b/tools/mothership/worker_server/process_rpm.go new file mode 100644 index 00000000..b83f02b4 --- /dev/null +++ b/tools/mothership/worker_server/process_rpm.go @@ -0,0 +1,62 @@ +package mothership_worker_server + +import ( + "github.com/pkg/errors" + "go.temporal.io/sdk/temporal" + "net/url" + "strings" +) + +// VerifyResourceExists verifies that the resource exists. +// This is a Temporal activity. +func (w *Worker) VerifyResourceExists(uri string) error { + canRead, err := w.storage.CanReadURI(uri) + if err != nil { + return errors.Wrap(err, "failed to check if resource URI can be read") + } + + if !canRead { + return temporal.NewNonRetryableApplicationError( + "cannot read resource URI", + "cannotReadResourceURI", + errors.New("client submitted a resource URI that cannot be read by server"), + ) + } + + // Get object name from URI. + // Check if object exists. + // If not, return error. + parsed, err := url.Parse(uri) + if err != nil { + return temporal.NewNonRetryableApplicationError( + "could not parse resource URI", + "couldNotParseResourceURI", + errors.Wrap(err, "failed to parse resource URI"), + ) + } + + split := strings.SplitN(parsed.Path, "/", 2) + if len(split) < 2 { + return temporal.NewNonRetryableApplicationError( + "invalid resource URI", + "invalidResourceURI", + errors.New("client submitted an invalid resource URI"), + ) + } + + object := split[1] + exists, err := w.storage.Exists(object) + if err != nil { + return errors.Wrap(err, "failed to check if resource exists") + } + + if !exists { + // Since the client can trigger this activity before uploading the resource, + // we should not return a non-retryable error. + // The parent workflow should handle the retry arrangements up to 2 hours + // per the spec. + return errors.New("resource does not exist") + } + + return nil +} diff --git a/tools/mothership/worker_server/worker.go b/tools/mothership/worker_server/worker.go new file mode 100644 index 00000000..36355f4a --- /dev/null +++ b/tools/mothership/worker_server/worker.go @@ -0,0 +1,18 @@ +package mothership_worker_server + +import ( + base "go.resf.org/peridot/base/go" + "go.resf.org/peridot/base/go/storage" +) + +type Worker struct { + db *base.DB + storage storage.Storage +} + +func New(db *base.DB, storage storage.Storage) *Worker { + return &Worker{ + db: db, + storage: storage, + } +} diff --git a/tools/mothership/worker_server/workflows.go b/tools/mothership/worker_server/workflows.go new file mode 100644 index 00000000..e0e411e5 --- /dev/null +++ b/tools/mothership/worker_server/workflows.go @@ -0,0 +1,35 @@ +package mothership_worker_server + +import ( + mothershippb "go.resf.org/peridot/tools/mothership/pb" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" + "time" +) + +var w Worker + +// ProcessRPMWorkflow processes an SRPM. +// Usually a client worker will first initiate an upload to the storage backend, +// then send a request to the Server `SubmitEntry` method (or send a request +// then upload the resource). +func ProcessRPMWorkflow(ctx workflow.Context, req *mothershippb.ProcessRPMRequest) (*mothershippb.ProcessRPMResponse, error) { + // First verify that the resource exists. + // The resource can be uploaded after the request is sent. + // So we should wait up to 2 hours. The initial timeouts should be low + // since the worker is most likely to upload the resource immediately. + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 25 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + // We're waiting 25 seconds each time + InitialInterval: 25 * time.Second, + BackoffCoefficient: 1, + // Maximum attempts should be set, so it's approximately 2 hours + MaximumAttempts: (60 * 60 * 2) / 25, + }, + }) + err := workflow.ExecuteActivity(ctx, w.VerifyResourceExists, req.RpmUri).Get(ctx, nil) + if err != nil { + return nil, err + } +}