diff --git a/tools/mothership/admin/rpc/rescue.go b/tools/mothership/admin/rpc/rescue.go index 253dbe07..10ed1676 100644 --- a/tools/mothership/admin/rpc/rescue.go +++ b/tools/mothership/admin/rpc/rescue.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" + "strings" ) func (s *Server) RescueEntryImport(ctx context.Context, req *mshipadminpb.RescueEntryImportRequest) (*emptypb.Empty, error) { @@ -45,6 +46,18 @@ func (s *Server) RescueEntryImport(ctx context.Context, req *mshipadminpb.Rescue // If on hold, then signal the workflow to continue. err = s.temporal.SignalWorkflow(ctx, "operations/"+entry.Sha256Sum, "", "rescue", true) if err != nil { + if strings.Contains(err.Error(), "already completed") { + // For some reason the entry got stuck in a weird state. + // Let's just set the state to FAILED. + entry.State = mothershippb.Entry_FAILED + err = base.Q[mothership_db.Entry](s.db).U(entry) + if err != nil { + base.LogErrorf("failed to update entry: %v", err) + return nil, status.Error(codes.Internal, "failed to update entry") + } + + return &emptypb.Empty{}, nil + } base.LogErrorf("failed to signal workflow: %v", err) return nil, status.Error(codes.Internal, "failed to signal workflow") } diff --git a/tools/mothership/proto/v1/entry.proto b/tools/mothership/proto/v1/entry.proto index f194148a..57b1b195 100644 --- a/tools/mothership/proto/v1/entry.proto +++ b/tools/mothership/proto/v1/entry.proto @@ -98,6 +98,10 @@ message Entry { // cancelled the entry. // This entry CAN'T be rescued. CANCELLED = 4; + + // Failed to archive the entry. + // This entry CAN'T be rescued. + FAILED = 5; } // State of the entry. State state = 12 [(google.api.field_behavior) = OUTPUT_ONLY]; diff --git a/tools/mothership/worker_server/process_rpm.go b/tools/mothership/worker_server/process_rpm.go index 75f60bc1..386b3fdf 100644 --- a/tools/mothership/worker_server/process_rpm.go +++ b/tools/mothership/worker_server/process_rpm.go @@ -17,6 +17,7 @@ package mothership_worker_server import ( "crypto/sha256" "encoding/hex" + "fmt" "github.com/go-git/go-billy/v5/memfs" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/storage/memory" @@ -147,12 +148,7 @@ func (w *Worker) SetEntryIDFromRPM(entry string, uri string, checksumSha256 stri } // Set entry ID - ent.EntryID = nevra.String() - // Remove everything after last dot for EntryID - // This is because we're going to replace arch with src - // RPM currently sets the Arch for SRPMs to the arch of the build machine - ent.EntryID = ent.EntryID[:strings.LastIndex(ent.EntryID, ".")] - ent.EntryID = ent.EntryID + ".src" + ent.EntryID = fmt.Sprintf("%s-%s-%s.src", nevra.Name, nevra.Version, nevra.Release) ent.Sha256Sum = checksumSha256 // Update entry diff --git a/tools/mothership/worker_server/workflows.go b/tools/mothership/worker_server/workflows.go index 708a816f..61464399 100644 --- a/tools/mothership/worker_server/workflows.go +++ b/tools/mothership/worker_server/workflows.go @@ -163,7 +163,7 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs) } // On defer, if the workflow is not completed, then we'll set the entry state - // to cancelled. + // to failed. defer func() { if entry.State == mothershippb.Entry_ARCHIVED { return @@ -177,7 +177,7 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs) MaximumAttempts: 0, }, }) - _ = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_CANCELLED, nil).Get(ctx, nil) + _ = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_FAILED, nil).Get(ctx, nil) }() // Set the entry name to the RPM NVR