From 41e39e07b13dc1d6e52de118e4ca58ac8f33b92a Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Wed, 6 Sep 2023 23:28:28 +0200 Subject: [PATCH] Move SubmitEntry to entry.go --- tools/mothership/rpc/entry.go | 57 +++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/tools/mothership/rpc/entry.go b/tools/mothership/rpc/entry.go index 388ce26e..57e4d2be 100644 --- a/tools/mothership/rpc/entry.go +++ b/tools/mothership/rpc/entry.go @@ -20,9 +20,13 @@ import ( base "go.resf.org/peridot/base/go" mothership_db "go.resf.org/peridot/tools/mothership/db" mothershippb "go.resf.org/peridot/tools/mothership/pb" + mothership_worker_server "go.resf.org/peridot/tools/mothership/worker_server" enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/client" + "google.golang.org/genproto/googleapis/longrunning" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "strings" ) func (s *Server) GetEntry(ctx context.Context, req *mothershippb.GetEntryRequest) (*mothershippb.Entry, error) { @@ -77,3 +81,56 @@ func (s *Server) ListEntries(_ context.Context, req *mothershippb.ListEntriesReq NextPageToken: nt, }, nil } + +// SubmitEntry handles the RPC request for submitting an entry. This is usually +// called by the worker. The worker must be authenticated. The checksum will "lease" +// the entry for the worker, so that other workers will not submit the same entry. +// This "lease" is enforced using Temporal +func (s *Server) SubmitEntry(ctx context.Context, req *mothershippb.SubmitEntryRequest) (*longrunning.Operation, error) { + worker, err := s.getWorkerIdentity(ctx) + if err != nil { + return nil, err + } + + // Now make sure the entry doesn't already exist in the ARCHIVED state. + // If it does, return an error. It should be retracted first. + entry, err := base.Q[mothership_db.Entry](s.db).F( + "sha256_sum", req.ProcessRpmRequest.Checksum, + "state", mothershippb.Entry_ARCHIVED, + ).GetOrNil() + if err != nil { + base.LogErrorf("failed to get entry: %v", err) + return nil, status.Error(codes.Internal, "failed to get entry") + } + if entry != nil { + return nil, status.Error(codes.AlreadyExists, "entry already exists, you must retract the entry before submitting again") + } + + startWorkflowOpts := client.StartWorkflowOptions{ + ID: "operations/" + req.ProcessRpmRequest.Checksum, + WorkflowExecutionErrorWhenAlreadyStarted: true, + WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + } + + // Submit to Temporal + run, err := s.temporal.ExecuteWorkflow( + context.Background(), + startWorkflowOpts, + mothership_worker_server.ProcessRPMWorkflow, + &mothershippb.ProcessRPMArgs{ + Request: req.ProcessRpmRequest, + InternalRequest: &mothershippb.ProcessRPMInternalRequest{ + WorkerId: worker.WorkerID, + }, + }, + ) + if err != nil { + if strings.Contains(err.Error(), "is already running") { + return nil, status.Error(codes.AlreadyExists, "entry is already running") + } + base.LogErrorf("failed to start workflow: %v", err) + return nil, status.Error(codes.Internal, "failed to start workflow") + } + + return s.getOperation(ctx, run.GetID()) +}