Allow entries to fail

This commit is contained in:
Mustafa Gezen 2023-08-31 16:40:23 +02:00
parent a562752457
commit 44fc9a0d3a
Signed by: mustafa
GPG Key ID: DCDF010D946438C1
4 changed files with 21 additions and 8 deletions

View File

@ -23,6 +23,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"strings"
)
func (s *Server) RescueEntryImport(ctx context.Context, req *mshipadminpb.RescueEntryImportRequest) (*emptypb.Empty, error) {
@ -45,6 +46,18 @@ func (s *Server) RescueEntryImport(ctx context.Context, req *mshipadminpb.Rescue
// If on hold, then signal the workflow to continue.
err = s.temporal.SignalWorkflow(ctx, "operations/"+entry.Sha256Sum, "", "rescue", true)
if err != nil {
if strings.Contains(err.Error(), "already completed") {
// For some reason the entry got stuck in a weird state.
// Let's just set the state to FAILED.
entry.State = mothershippb.Entry_FAILED
err = base.Q[mothership_db.Entry](s.db).U(entry)
if err != nil {
base.LogErrorf("failed to update entry: %v", err)
return nil, status.Error(codes.Internal, "failed to update entry")
}
return &emptypb.Empty{}, nil
}
base.LogErrorf("failed to signal workflow: %v", err)
return nil, status.Error(codes.Internal, "failed to signal workflow")
}

View File

@ -98,6 +98,10 @@ message Entry {
// cancelled the entry.
// This entry CAN'T be rescued.
CANCELLED = 4;
// Failed to archive the entry.
// This entry CAN'T be rescued.
FAILED = 5;
}
// State of the entry.
State state = 12 [(google.api.field_behavior) = OUTPUT_ONLY];

View File

@ -17,6 +17,7 @@ package mothership_worker_server
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/go-git/go-billy/v5/memfs"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/storage/memory"
@ -147,12 +148,7 @@ func (w *Worker) SetEntryIDFromRPM(entry string, uri string, checksumSha256 stri
}
// Set entry ID
ent.EntryID = nevra.String()
// Remove everything after last dot for EntryID
// This is because we're going to replace arch with src
// RPM currently sets the Arch for SRPMs to the arch of the build machine
ent.EntryID = ent.EntryID[:strings.LastIndex(ent.EntryID, ".")]
ent.EntryID = ent.EntryID + ".src"
ent.EntryID = fmt.Sprintf("%s-%s-%s.src", nevra.Name, nevra.Version, nevra.Release)
ent.Sha256Sum = checksumSha256
// Update entry

View File

@ -163,7 +163,7 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs)
}
// On defer, if the workflow is not completed, then we'll set the entry state
// to cancelled.
// to failed.
defer func() {
if entry.State == mothershippb.Entry_ARCHIVED {
return
@ -177,7 +177,7 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs)
MaximumAttempts: 0,
},
})
_ = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_CANCELLED, nil).Get(ctx, nil)
_ = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_FAILED, nil).Get(ctx, nil)
}()
// Set the entry name to the RPM NVR