This commit is contained in:
Mustafa Gezen 2023-09-06 23:29:32 +02:00
parent a5ea8b27c7
commit a7332a640b
Signed by: mustafa
GPG Key ID: DCDF010D946438C1

View File

@ -18,15 +18,9 @@ import (
"context"
base "go.resf.org/peridot/base/go"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
mothership_worker_server "go.resf.org/peridot/tools/mothership/worker_server"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"strings"
)
// getWorkerIdentity returns the identity of the worker that the request is
@ -56,53 +50,3 @@ func (s *Server) getWorkerIdentity(ctx context.Context) (*mothership_db.Worker,
return worker, nil
}
// SubmitEntry handles the RPC request for submitting an entry. This is usually
// called by the worker. The worker must be authenticated. The checksum will "lease"
// the entry for the worker, so that other workers will not submit the same entry.
// This "lease" is enforced using Temporal
func (s *Server) SubmitEntry(ctx context.Context, req *mothershippb.SubmitEntryRequest) (*longrunning.Operation, error) {
worker, err := s.getWorkerIdentity(ctx)
if err != nil {
return nil, err
}
// Now make sure the entry doesn't already exist in the ARCHIVED state.
// If it does, return an error. It should be retracted first.
entry, err := base.Q[mothership_db.Entry](s.db).F("sha256_sum", req.ProcessRpmRequest.Checksum).GetOrNil()
if err != nil {
base.LogErrorf("failed to get entry: %v", err)
return nil, status.Error(codes.Internal, "failed to get entry")
}
if entry != nil && entry.State == mothershippb.Entry_ARCHIVED {
return nil, status.Error(codes.AlreadyExists, "entry already exists, you must retract the entry before submitting again")
}
startWorkflowOpts := client.StartWorkflowOptions{
ID: "operations/" + req.ProcessRpmRequest.Checksum,
WorkflowExecutionErrorWhenAlreadyStarted: true,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
// Submit to Temporal
run, err := s.temporal.ExecuteWorkflow(
context.Background(),
startWorkflowOpts,
mothership_worker_server.ProcessRPMWorkflow,
&mothershippb.ProcessRPMArgs{
Request: req.ProcessRpmRequest,
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: worker.WorkerID,
},
},
)
if err != nil {
if strings.Contains(err.Error(), "is already running") {
return nil, status.Error(codes.AlreadyExists, "entry is already running")
}
base.LogErrorf("failed to start workflow: %v", err)
return nil, status.Error(codes.Internal, "failed to start workflow")
}
return s.getOperation(ctx, run.GetID())
}