Move SubmitEntry to entry.go

This commit is contained in:
Mustafa Gezen 2023-09-06 23:28:28 +02:00
parent 987ef366ab
commit 41e39e07b1
Signed by: mustafa
GPG Key ID: DCDF010D946438C1

View File

@ -20,9 +20,13 @@ import (
base "go.resf.org/peridot/base/go"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
mothership_worker_server "go.resf.org/peridot/tools/mothership/worker_server"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"strings"
)
func (s *Server) GetEntry(ctx context.Context, req *mothershippb.GetEntryRequest) (*mothershippb.Entry, error) {
@ -77,3 +81,56 @@ func (s *Server) ListEntries(_ context.Context, req *mothershippb.ListEntriesReq
NextPageToken: nt,
}, nil
}
// SubmitEntry handles the RPC request for submitting an entry. This is usually
// called by the worker. The worker must be authenticated. The checksum will "lease"
// the entry for the worker, so that other workers will not submit the same entry.
// This "lease" is enforced using Temporal
func (s *Server) SubmitEntry(ctx context.Context, req *mothershippb.SubmitEntryRequest) (*longrunning.Operation, error) {
worker, err := s.getWorkerIdentity(ctx)
if err != nil {
return nil, err
}
// Now make sure the entry doesn't already exist in the ARCHIVED state.
// If it does, return an error. It should be retracted first.
entry, err := base.Q[mothership_db.Entry](s.db).F(
"sha256_sum", req.ProcessRpmRequest.Checksum,
"state", mothershippb.Entry_ARCHIVED,
).GetOrNil()
if err != nil {
base.LogErrorf("failed to get entry: %v", err)
return nil, status.Error(codes.Internal, "failed to get entry")
}
if entry != nil {
return nil, status.Error(codes.AlreadyExists, "entry already exists, you must retract the entry before submitting again")
}
startWorkflowOpts := client.StartWorkflowOptions{
ID: "operations/" + req.ProcessRpmRequest.Checksum,
WorkflowExecutionErrorWhenAlreadyStarted: true,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
// Submit to Temporal
run, err := s.temporal.ExecuteWorkflow(
context.Background(),
startWorkflowOpts,
mothership_worker_server.ProcessRPMWorkflow,
&mothershippb.ProcessRPMArgs{
Request: req.ProcessRpmRequest,
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: worker.WorkerID,
},
},
)
if err != nil {
if strings.Contains(err.Error(), "is already running") {
return nil, status.Error(codes.AlreadyExists, "entry is already running")
}
base.LogErrorf("failed to start workflow: %v", err)
return nil, status.Error(codes.Internal, "failed to start workflow")
}
return s.getOperation(ctx, run.GetID())
}