Add mship_worker_server (Temporal worker)

This commit is contained in:
Mustafa Gezen 2023-08-27 06:04:51 +02:00
parent e7efc38219
commit cf859b9793
Signed by: mustafa
GPG Key ID: DCDF010D946438C1
9 changed files with 265 additions and 2 deletions

View File

@ -11,4 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,22 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
go_library(
name = "mship_worker_server_lib",
srcs = ["main.go"],
importpath = "go.resf.org/peridot/tools/mothership/cmd/mship_worker_server",
visibility = ["//visibility:private"],
deps = [
"//base/go",
"//base/go/storage/detector",
"//tools/mothership/worker_server",
"//vendor/github.com/urfave/cli/v2:cli",
"//vendor/go.temporal.io/sdk/client",
"//vendor/go.temporal.io/sdk/worker",
],
)
go_binary(
name = "mship_worker_server",
embed = [":mship_worker_server_lib"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,50 @@
package main
import (
"github.com/urfave/cli/v2"
base "go.resf.org/peridot/base/go"
storage_detector "go.resf.org/peridot/base/go/storage/detector"
mothership_worker_server "go.resf.org/peridot/tools/mothership/worker_server"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"os"
)
func run(ctx *cli.Context) error {
temporalClient, err := base.GetTemporalClientFromFlags(ctx, client.Options{})
if err != nil {
return err
}
db := base.GetDBFromFlags(ctx)
storage, err := storage_detector.FromFlags(ctx)
if err != nil {
return err
}
w := worker.New(temporalClient, ctx.String(string(base.EnvVarTemporalTaskQueue)), worker.Options{})
workerServer := mothership_worker_server.New(db, storage)
// Register workflows
w.RegisterWorkflow(mothership_worker_server.ProcessRPMWorkflow)
// Register activities
w.RegisterActivity(workerServer)
// Start worker
return w.Run(worker.InterruptCh())
}
func main() {
base.ChangeDefaultForEnvVar(base.EnvVarTemporalTaskQueue, "mship_worker_server")
app := &cli.App{
Name: "mship_worker_server",
Action: run,
Flags: base.WithDefaultCliFlagsTemporal(base.WithStorageFlags()...),
}
if err := app.Run(os.Args); err != nil {
base.LogFatalf("failed to run mship_worker_server: %v", err)
}
}

View File

@ -11,4 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,58 @@
// Copyright 2023 Peridot Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package peridot.tools.mothership.v1;
import "google/api/field_behavior.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
import "tools/mothership/proto/v1/entry.proto";
option java_multiple_files = true;
option java_outer_classname = "ProcessRpmProto";
option java_package = "org.resf.peridot.tools.mothership.v1";
option go_package = "go.resf.org/peridot/tools/mothership/pb;mothershippb";
// ProcessRPMRequest is the request message for the ProcessRPM workflow
message ProcessRPMRequest {
// URI of the RPM to process
// e.g. gs://bucket/path/to/rpm.rpm
// The server must have read access to the RPM and WILL error if it does not
string rpm_uri = 1 [(google.api.field_behavior) = REQUIRED];
// OS Release of the RPM
// e.g. Red Hat Enterprise Linux release 8.8 (Ootpa)
string os_release = 2 [(google.api.field_behavior) = REQUIRED];
// Self reported checksum of the RPM
// Must be a SHA256 checksum and match the RPM
string checksum = 3 [(google.api.field_behavior) = REQUIRED];
}
// ProcessRPMMetadata is the metadata for the ProcessRPM workflow
message ProcessRPMMetadata {
// The time at which the workflow started
google.protobuf.Timestamp start_time = 1;
// The time at which the workflow finished
google.protobuf.Timestamp end_time = 2;
}
// ProcessRPMResponse is the response message for the ProcessRPM workflow
message ProcessRPMResponse {
// The entry created for the RPM
Entry entry = 1;
}

View File

@ -0,0 +1,20 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "worker_server",
srcs = [
"process_rpm.go",
"worker.go",
"workflows.go",
],
importpath = "go.resf.org/peridot/tools/mothership/worker_server",
visibility = ["//visibility:public"],
deps = [
"//base/go",
"//base/go/storage",
"//tools/mothership/proto/v1:pb",
"//vendor/github.com/pkg/errors",
"//vendor/go.temporal.io/sdk/temporal",
"//vendor/go.temporal.io/sdk/workflow",
],
)

View File

@ -0,0 +1,62 @@
package mothership_worker_server
import (
"github.com/pkg/errors"
"go.temporal.io/sdk/temporal"
"net/url"
"strings"
)
// VerifyResourceExists verifies that the resource exists.
// This is a Temporal activity.
func (w *Worker) VerifyResourceExists(uri string) error {
canRead, err := w.storage.CanReadURI(uri)
if err != nil {
return errors.Wrap(err, "failed to check if resource URI can be read")
}
if !canRead {
return temporal.NewNonRetryableApplicationError(
"cannot read resource URI",
"cannotReadResourceURI",
errors.New("client submitted a resource URI that cannot be read by server"),
)
}
// Get object name from URI.
// Check if object exists.
// If not, return error.
parsed, err := url.Parse(uri)
if err != nil {
return temporal.NewNonRetryableApplicationError(
"could not parse resource URI",
"couldNotParseResourceURI",
errors.Wrap(err, "failed to parse resource URI"),
)
}
split := strings.SplitN(parsed.Path, "/", 2)
if len(split) < 2 {
return temporal.NewNonRetryableApplicationError(
"invalid resource URI",
"invalidResourceURI",
errors.New("client submitted an invalid resource URI"),
)
}
object := split[1]
exists, err := w.storage.Exists(object)
if err != nil {
return errors.Wrap(err, "failed to check if resource exists")
}
if !exists {
// Since the client can trigger this activity before uploading the resource,
// we should not return a non-retryable error.
// The parent workflow should handle the retry arrangements up to 2 hours
// per the spec.
return errors.New("resource does not exist")
}
return nil
}

View File

@ -0,0 +1,18 @@
package mothership_worker_server
import (
base "go.resf.org/peridot/base/go"
"go.resf.org/peridot/base/go/storage"
)
type Worker struct {
db *base.DB
storage storage.Storage
}
func New(db *base.DB, storage storage.Storage) *Worker {
return &Worker{
db: db,
storage: storage,
}
}

View File

@ -0,0 +1,35 @@
package mothership_worker_server
import (
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"time"
)
var w Worker
// ProcessRPMWorkflow processes an SRPM.
// Usually a client worker will first initiate an upload to the storage backend,
// then send a request to the Server `SubmitEntry` method (or send a request
// then upload the resource).
func ProcessRPMWorkflow(ctx workflow.Context, req *mothershippb.ProcessRPMRequest) (*mothershippb.ProcessRPMResponse, error) {
// First verify that the resource exists.
// The resource can be uploaded after the request is sent.
// So we should wait up to 2 hours. The initial timeouts should be low
// since the worker is most likely to upload the resource immediately.
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 25 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
// We're waiting 25 seconds each time
InitialInterval: 25 * time.Second,
BackoffCoefficient: 1,
// Maximum attempts should be set, so it's approximately 2 hours
MaximumAttempts: (60 * 60 * 2) / 25,
},
})
err := workflow.ExecuteActivity(ctx, w.VerifyResourceExists, req.RpmUri).Get(ctx, nil)
if err != nil {
return nil, err
}
}