ProcessRPMWorkflow first full prototype

This commit is contained in:
Mustafa Gezen 2023-08-28 16:57:40 +02:00
parent 13207458f0
commit 18b15013eb
Signed by: mustafa
GPG Key ID: DCDF010D946438C1
7 changed files with 94 additions and 6 deletions

View File

@ -20,7 +20,6 @@ go_library(
importpath = "go.resf.org/peridot/base/go/storage/detector",
visibility = ["//visibility:public"],
deps = [
"//base/go",
"//base/go/storage",
"//base/go/storage/memory",
"//base/go/storage/s3",

View File

@ -102,6 +102,7 @@ func run(ctx *cli.Context) error {
}
func main() {
base.ChangeDefaultDatabaseURL("mothership")
base.ChangeDefaultForEnvVar(base.EnvVarTemporalTaskQueue, "mship_worker_server")
flags := base.WithDefaultCliFlagsTemporal(base.WithStorageFlags()...)

View File

@ -40,6 +40,29 @@ message ProcessRPMRequest {
// Self reported checksum of the RPM
// Must be a SHA256 checksum and match the RPM
string checksum = 3 [(google.api.field_behavior) = REQUIRED];
// Self reported repository of the RPM
// e.g. BaseOS
string repository = 4 [(google.api.field_behavior) = REQUIRED];
// Batch to associate the RPM with
string batch = 5;
}
// ProcessRPMInternalRequest is the request message that the Server
// uses in its call to the ProcessRPM workflow
message ProcessRPMInternalRequest {
// Worker ID of the worker processing the RPM
string worker_id = 1 [(google.api.field_behavior) = REQUIRED];
}
// ProcessRPMArgs is the arguments for the ProcessRPM workflow
message ProcessRPMArgs {
// Public request
ProcessRPMRequest request = 1 [(google.api.field_behavior) = REQUIRED];
// Internal request
ProcessRPMInternalRequest internal_request = 2 [(google.api.field_behavior) = REQUIRED];
}
// ProcessRPMMetadata is the metadata for the ProcessRPM workflow
@ -62,4 +85,11 @@ message ImportRPMResponse {
// Commit hash of the imported RPM
// e.g. 1234567890abcdef1234567890abcdef12345678
string commit_hash = 1 [(google.api.field_behavior) = REQUIRED];
// Commit URI of the imported RPM
string commit_uri = 2 [(google.api.field_behavior) = REQUIRED];
// NEVRA of the imported RPM
// e.g. rpm-1.0.0-1.el8.x86_64
string nevra = 3 [(google.api.field_behavior) = REQUIRED];
}

View File

@ -17,6 +17,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "worker_server",
srcs = [
"entry.go",
"process_rpm.go",
"worker.go",
"workflows.go",
@ -26,6 +27,7 @@ go_library(
deps = [
"//base/go",
"//base/go/storage",
"//tools/mothership/db",
"//tools/mothership/proto/v1:pb",
"//tools/mothership/worker_server/forge",
"//tools/mothership/worker_server/srpm_import",

View File

@ -0,0 +1,40 @@
package mothership_worker_server
import (
"database/sql"
"github.com/pkg/errors"
base "go.resf.org/peridot/base/go"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
)
func (w *Worker) CreateEntry(args *mothershippb.ProcessRPMArgs, importRpmRes *mothershippb.ImportRPMResponse) (*mothershippb.Entry, error) {
req := args.Request
internalReq := args.InternalRequest
entry := mothership_db.Entry{
Name: base.NameGen("entries"),
EntryID: importRpmRes.Nevra,
OSRelease: req.OsRelease,
Sha256Sum: req.Checksum,
RepositoryName: req.Repository,
WorkerID: sql.NullString{
String: internalReq.WorkerId,
Valid: true,
},
CommitURI: importRpmRes.CommitUri,
CommitHash: importRpmRes.CommitHash,
}
if req.Batch != "" {
entry.BatchName = sql.NullString{
String: req.Batch,
Valid: true,
}
}
err := base.Q[mothership_db.Entry](w.db).Create(&entry)
if err != nil {
return nil, errors.Wrap(err, "failed to create entry")
}
return entry.ToPB(), nil
}

View File

@ -176,7 +176,11 @@ func (w *Worker) ImportRPM(uri string, checksumSha256 string) (*mothershippb.Imp
return nil, errors.Wrap(err, "failed to import SRPM")
}
commitURI := w.forge.GetCommitViewerURL(repoName, commit.Hash.String())
return &mothershippb.ImportRPMResponse{
CommitHash: commit.Hash.String(),
CommitUri: commitURI,
Nevra: nevra.String(),
}, nil
}

View File

@ -15,7 +15,6 @@
package mothership_worker_server
import (
"errors"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
@ -28,7 +27,7 @@ var w Worker
// Usually a client worker will first initiate an upload to the storage backend,
// then send a request to the Server `SubmitEntry` method (or send a request
// then upload the resource).
func ProcessRPMWorkflow(ctx workflow.Context, req *mothershippb.ProcessRPMRequest) (*mothershippb.ProcessRPMResponse, error) {
func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs) (*mothershippb.ProcessRPMResponse, error) {
// First verify that the resource exists.
// The resource can be uploaded after the request is sent.
// So we should wait up to 2 hours. The initial timeouts should be low
@ -43,7 +42,7 @@ func ProcessRPMWorkflow(ctx workflow.Context, req *mothershippb.ProcessRPMReques
MaximumAttempts: (60 * 60 * 2) / 25,
},
})
err := workflow.ExecuteActivity(ctx, w.VerifyResourceExists, req.RpmUri).Get(ctx, nil)
err := workflow.ExecuteActivity(ctx, w.VerifyResourceExists, args.Request.RpmUri).Get(ctx, nil)
if err != nil {
return nil, err
}
@ -57,10 +56,23 @@ func ProcessRPMWorkflow(ctx workflow.Context, req *mothershippb.ProcessRPMReques
MaximumAttempts: 0,
},
})
err = workflow.ExecuteActivity(ctx, w.ImportRPM, req.RpmUri, req.Checksum).Get(ctx, nil)
var importRpmRes mothershippb.ImportRPMResponse
err = workflow.ExecuteActivity(ctx, w.ImportRPM, args.Request.RpmUri, args.Request.Checksum).Get(ctx, &importRpmRes)
if err != nil {
return nil, err
}
return nil, errors.New("unimplemented")
// Now the import has reached the Git forge. Let's create an entry.
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 25 * time.Second,
})
var entry mothershippb.Entry
err = workflow.ExecuteActivity(ctx, w.CreateEntry, args, &importRpmRes).Get(ctx, &entry)
if err != nil {
return nil, err
}
return &mothershippb.ProcessRPMResponse{
Entry: &entry,
}, nil
}