Allow setting import workflows on hold and being able to rescue them

This commit is contained in:
Mustafa Gezen 2023-08-31 10:20:38 +02:00
parent 9971185bd8
commit fab39c3021
Signed by: mustafa
GPG Key ID: DCDF010D946438C1
11 changed files with 550 additions and 28 deletions

View File

@ -0,0 +1,39 @@
package mothershipadmin_rpc
import (
"context"
base "go.resf.org/peridot/base/go"
mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)
func (s *Server) RescueEntryImport(ctx context.Context, req *mshipadminpb.RescueEntryImportRequest) (*emptypb.Empty, error) {
// First make sure an entry with the given name exists.
entry, err := base.Q[mothership_db.Entry](s.db).F("name", req.Name).GetOrNil()
if err != nil {
base.LogErrorf("failed to get entry: %v", err)
return nil, status.Error(codes.Internal, "failed to get entry")
}
if entry == nil {
return nil, status.Error(codes.NotFound, "entry not found")
}
// Make sure the entry is on hold.
if entry.State != mothershippb.Entry_ON_HOLD {
return nil, status.Error(codes.FailedPrecondition, "entry is not on hold")
}
// If on hold, then signal the workflow to continue.
err = s.temporal.SignalWorkflow(ctx, "operations/"+entry.Sha256Sum, "", "rescue", true)
if err != nil {
base.LogErrorf("failed to signal workflow: %v", err)
return nil, status.Error(codes.Internal, "failed to signal workflow")
}
return &emptypb.Empty{}, nil
}

View File

@ -17,6 +17,7 @@ package mothershipadmin_rpc
import (
base "go.resf.org/peridot/base/go"
mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb"
"go.temporal.io/sdk/client"
"google.golang.org/grpc"
)
@ -24,10 +25,11 @@ type Server struct {
base.GRPCServer
mshipadminpb.UnimplementedMshipAdminServer
db *base.DB
db *base.DB
temporal client.Client
}
func NewServer(db *base.DB, oidcInterceptorDetails *base.OidcInterceptorDetails, opts ...base.GRPCServerOption) (*Server, error) {
func NewServer(db *base.DB, temporalClient client.Client, oidcInterceptorDetails *base.OidcInterceptorDetails, opts ...base.GRPCServerOption) (*Server, error) {
oidcInterceptor, err := base.OidcGrpcInterceptor(oidcInterceptorDetails)
if err != nil {
return nil, err
@ -42,6 +44,7 @@ func NewServer(db *base.DB, oidcInterceptorDetails *base.OidcInterceptorDetails,
return &Server{
GRPCServer: *grpcServer,
db: db,
temporal: temporalClient,
}, nil
}

View File

@ -23,7 +23,8 @@ import (
)
type Worker struct {
PikaTableName string `pika:"workers"`
PikaTableName string `pika:"workers"`
PikaDefaultOrderBy string `pika:"-create_time"`
Name string `db:"name"`
CreateTime time.Time `db:"create_time" pika:"omitempty"`

View File

@ -63,6 +63,16 @@ service MshipAdmin {
};
option (google.api.method_signature) = "name";
}
// Rescue an entry import attempt
// This should be called after fixing patches that caused the import to fail.
// This will re-run the import attempt.
rpc RescueEntryImport(RescueEntryImportRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
post: "/v1/{name=entries/*}:rescueImport"
};
option (google.api.method_signature) = "name";
}
}
// GetWorkerRequest is the request message for GetWorker.
@ -118,3 +128,9 @@ message DeleteWorkerRequest {
// Required. The name of the worker to delete.
string name = 1 [(google.api.field_behavior) = REQUIRED];
}
// RescueEntryImportRequest is the request message for RescueEntryImport.
message RescueEntryImportRequest {
// Required. The name of the entry to rescue.
string name = 1 [(google.api.field_behavior) = REQUIRED];
}

View File

@ -20,6 +20,7 @@ import "google/api/annotations.proto";
import "google/api/client.proto";
import "google/api/field_behavior.proto";
import "google/longrunning/operations.proto";
import "google/protobuf/empty.proto";
import "tools/mothership/proto/v1/batch.proto";
import "tools/mothership/proto/v1/entry.proto";
import "tools/mothership/proto/v1/process_rpm.proto";
@ -88,9 +89,12 @@ service SrpmArchiver {
// If after 2 hours the SRPM is not processed, the worker can assume that
// the SRPM is lost and can be re-uploaded. It that case, the entry will be
// re-assigned to the worker.
// If a checksum can't be leased because it's already being processed,
// AlreadyExists error will be returned.
// The worker MUST stop processing the SRPM in that case.
rpc SubmitEntry(SubmitEntryRequest) returns (google.longrunning.Operation) {
option (google.api.http) = {
post: "/v1/entries:submitEntry"
post: "/v1/actions:submitEntry"
body: "*"
};
option (google.longrunning.operation_info) = {
@ -101,14 +105,23 @@ service SrpmArchiver {
// WorkerUploadObject is used by workers to upload objects to the
// object storage service.
// Returns 409 (Conflict) if the SRPM already exists. This should indicate to the
// worker that they should not continue processing the SRPM.
// Returns AlreadyExists if the SRPM already exists.
// This doesn't necessarily mean that the worker should stop processing,
// especially if it acquired a lease to process this particular SRPM.
rpc WorkerUploadObject(stream WorkerUploadObjectRequest) returns (WorkerUploadObjectResponse) {
option (google.api.http) = {
post: "/v1/srpms:workerUploadObject"
post: "/v1/actions:workerUploadObject"
body: "chunk"
};
}
// WorkerPing is used by workers to ping the server.
// This is used to check if the worker is still alive.
rpc WorkerPing(google.protobuf.Empty) returns (google.protobuf.Empty) {
option (google.api.http) = {
post: "/v1/actions:workerPing"
};
}
}
// Request message for GetBatch method.
@ -223,10 +236,6 @@ message SubmitEntryRequest {
// Process request for RPM.
// This request is sent to the worker to process the RPM.
ProcessRPMRequest process_rpm_request = 1 [(google.api.field_behavior) = REQUIRED];
// The name of the batch to submit the entry to.
// For example: "batches/1234".
string batch = 2;
}
// Request message for WorkerUploadObject method.

View File

@ -17,17 +17,23 @@ package mothership_rpc
import (
base "go.resf.org/peridot/base/go"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"go.temporal.io/sdk/client"
"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/grpc"
)
type Server struct {
base.GRPCServer
mothershippb.UnimplementedSrpmArchiverServer
db *base.DB
mothershippb.UnimplementedSrpmArchiverServer
longrunning.UnimplementedOperationsServer
db *base.DB
temporal client.Client
}
func NewServer(db *base.DB, opts ...base.GRPCServerOption) (*Server, error) {
func NewServer(db *base.DB, temporalClient client.Client, opts ...base.GRPCServerOption) (*Server, error) {
opts = append(opts, base.WithServeMuxAdditionalHeaders("x-mship-worker-secret"))
grpcServer, err := base.NewGRPCServer(opts...)
if err != nil {
return nil, err
@ -36,14 +42,17 @@ func NewServer(db *base.DB, opts ...base.GRPCServerOption) (*Server, error) {
return &Server{
GRPCServer: *grpcServer,
db: db,
temporal: temporalClient,
}, nil
}
func (s *Server) Start() error {
s.RegisterService(func(server *grpc.Server) {
longrunning.RegisterOperationsServer(server, s)
mothershippb.RegisterSrpmArchiverServer(server, s)
})
if err := s.GatewayEndpoints(
longrunning.RegisterOperationsHandler,
mothershippb.RegisterSrpmArchiverHandler,
); err != nil {
return err

View File

@ -0,0 +1,83 @@
package mothership_rpc
import (
"context"
base "go.resf.org/peridot/base/go"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
mothership_worker_server "go.resf.org/peridot/tools/mothership/worker_server"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"strings"
)
// getWorkerIdentity returns the identity of the worker that the request is
// coming from. Returns an error if the worker is not found or unauthenticated.
func (s *Server) getWorkerIdentity(ctx context.Context) (*mothership_db.Worker, error) {
// Get x-mship-worker-secret
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
secrets := md["x-mship-worker-secret"]
if len(secrets) != 1 {
return nil, status.Error(codes.Unauthenticated, "missing worker secret")
}
secret := secrets[0]
worker, err := base.Q[mothership_db.Worker](s.db).F("api_secret", secret).GetOrNil()
if err != nil {
base.LogErrorf("failed to get worker: %v", err)
return nil, status.Error(codes.Internal, "failed to get worker")
}
if worker == nil {
return nil, status.Error(codes.Unauthenticated, "invalid worker secret")
}
return worker, nil
}
// SubmitEntry handles the RPC request for submitting an entry. This is usually
// called by the worker. The worker must be authenticated. The checksum will "lease"
// the entry for the worker, so that other workers will not submit the same entry.
// This "lease" is enforced using Temporal
func (s *Server) SubmitEntry(ctx context.Context, req *mothershippb.SubmitEntryRequest) (*longrunning.Operation, error) {
worker, err := s.getWorkerIdentity(ctx)
if err != nil {
return nil, err
}
startWorkflowOpts := client.StartWorkflowOptions{
ID: "operations/" + req.ProcessRpmRequest.Checksum,
WorkflowExecutionErrorWhenAlreadyStarted: true,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
// Submit to Temporal
run, err := s.temporal.ExecuteWorkflow(
context.Background(),
startWorkflowOpts,
mothership_worker_server.ProcessRPMWorkflow,
&mothershippb.ProcessRPMArgs{
Request: req.ProcessRpmRequest,
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: worker.WorkerID,
},
},
)
if err != nil {
if strings.Contains(err.Error(), "is already running") {
return nil, status.Error(codes.AlreadyExists, "entry is already running")
}
base.LogErrorf("failed to start workflow: %v", err)
return nil, status.Error(codes.Internal, "failed to start workflow")
}
return s.getOperation(ctx, run.GetID())
}

View File

@ -22,6 +22,8 @@ import (
"github.com/go-git/go-git/v5/storage/memory"
"github.com/pkg/errors"
"github.com/sassoftware/go-rpmutils"
base "go.resf.org/peridot/base/go"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"go.resf.org/peridot/tools/mothership/worker_server/srpm_import"
"go.temporal.io/sdk/temporal"
@ -86,6 +88,81 @@ func (w *Worker) VerifyResourceExists(uri string) error {
return nil
}
// SetEntryIDFromRPM sets the entry ID from the RPM.
// This is a Temporal activity.
func (w *Worker) SetEntryIDFromRPM(entry string, uri string, checksumSha256 string) error {
ent, err := base.Q[mothership_db.Entry](w.db).F("name", entry).GetOrNil()
if err != nil {
return errors.Wrap(err, "failed to get entry")
}
if ent == nil {
return errors.New("entry does not exist")
}
tempDir, err := os.MkdirTemp("", "mothership-worker-server-import-rpm-*")
if err != nil {
return errors.Wrap(err, "failed to create temporary directory")
}
defer os.RemoveAll(tempDir)
// Parse uri
parsed, err := url.Parse(uri)
if err != nil {
return errors.Wrap(err, "failed to parse resource URI")
}
// Download the resource to the temporary directory
err = w.storage.Download(parsed.Path, filepath.Join(tempDir, "resource.rpm"))
if err != nil {
return errors.Wrap(err, "failed to download resource")
}
// Verify checksum
hash := sha256.New()
f, err := os.Open(filepath.Join(tempDir, "resource.rpm"))
if err != nil {
return errors.Wrap(err, "failed to open resource")
}
defer f.Close()
if _, err := io.Copy(hash, f); err != nil {
return errors.Wrap(err, "failed to hash resource")
}
if hex.EncodeToString(hash.Sum(nil)) != checksumSha256 {
return errors.New("checksum does not match")
}
// Read the RPM headers
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return errors.Wrap(err, "failed to seek resource")
}
rpm, err := rpmutils.ReadRpm(f)
if err != nil {
return errors.Wrap(err, "failed to read RPM headers")
}
nevra, err := rpm.Header.GetNEVRA()
if err != nil {
return errors.Wrap(err, "failed to get RPM NEVRA")
}
// Set entry ID
ent.EntryID = nevra.String()
// Remove everything after last dot for EntryID
// This is because we're going to replace arch with src
// RPM currently sets the Arch for SRPMs to the arch of the build machine
ent.EntryID = ent.EntryID[:strings.LastIndex(ent.EntryID, ".")]
ent.EntryID = ent.EntryID + ".src"
ent.Sha256Sum = checksumSha256
// Update entry
if err := base.Q[mothership_db.Entry](w.db).U(ent); err != nil {
return errors.Wrap(err, "failed to update entry")
}
return nil
}
// ImportRPM imports an RPM into the database.
// This is a Temporal activity.
func (w *Worker) ImportRPM(uri string, checksumSha256 string, osRelease string) (*mothershippb.ImportRPMResponse, error) {

View File

@ -25,13 +25,19 @@ import (
"github.com/go-git/go-git/v5/plumbing/object"
storage2 "github.com/go-git/go-git/v5/storage"
"github.com/pkg/errors"
srpmprocpb "github.com/rocky-linux/srpmproc/pb"
"github.com/rocky-linux/srpmproc/pkg/data"
"github.com/rocky-linux/srpmproc/pkg/directives"
"github.com/sassoftware/go-rpmutils"
"go.resf.org/peridot/base/go/storage"
"golang.org/x/crypto/openpgp"
"google.golang.org/protobuf/encoding/prototext"
"io"
"log"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
)
@ -249,6 +255,10 @@ func (s *State) writeMetadataFile(targetFS billy.Filesystem) error {
}
metadataFile := fmt.Sprintf(".%s.metadata", name[0])
// Delete the file if it exists
_ = targetFS.Remove(metadataFile)
f, err := targetFS.Create(metadataFile)
if err != nil {
return errors.Wrap(err, "failed to open metadata file")
@ -430,6 +440,11 @@ func (s *State) cleanTargetRepo(wt *git.Worktree, root string) error {
}
for _, f := range ls {
// Don't delete the PATCHES directory
if f.Name() == "PATCHES" && f.IsDir() {
continue
}
// If it's a directory, then recurse into it.
if f.IsDir() {
err := s.cleanTargetRepo(wt, filepath.Join(root, f.Name()))
@ -491,6 +506,12 @@ func (s *State) populateTargetRepo(repo *git.Repository, targetFS billy.Filesyst
return errors.Wrap(err, "failed to expand layout")
}
// If the target FS has patches, apply the directives
err = s.patchTargetRepo(repo, lookaside)
if err != nil {
return errors.Wrap(err, "failed to patch target repo")
}
// Commit the changes to the target repository.
_, err = wt.Add(".")
if err != nil {
@ -544,6 +565,120 @@ func (s *State) pushTargetRepo(repo *git.Repository, opts *git.PushOptions) erro
return nil
}
func (s *State) patchTargetRepo(repo *git.Repository, lookaside storage.Storage) error {
// We can re-use srpmproc as we should stay compatible with it
// Instead of OpenPatch, we'll look for patches in the targetFS
// todo(mustafa): RESF still uses OpenPatch, so we'll need to change that
wt, err := repo.Worktree()
if err != nil {
return errors.Wrap(err, "failed to get worktree")
}
nevra, err := s.rpm.Header.GetNEVRA()
if err != nil {
return errors.Wrap(err, "failed to get NEVRA")
}
dist := elDistRegex.FindString(nevra.Release)
if dist == "" {
return errors.Wrap(err, "failed to determine dist tag")
}
distNum, err := strconv.Atoi(dist[2:])
if err != nil {
return errors.Wrap(err, "failed to parse dist tag")
}
pd := &data.ProcessData{
ImportBranchPrefix: "el",
Version: distNum,
BlobStorage: &srpmprocBlobCompat{lookaside},
Importer: &srpmprocImportModeCompat{},
Log: log.New(os.Stderr, "", 0),
}
md := &data.ModeData{
SourcesToIgnore: []*data.IgnoredSource{},
}
// Look in the PATCHES/ directory for any .cfg files
patchesLs, err := wt.Filesystem.ReadDir("PATCHES")
if err != nil {
return errors.Wrap(err, "failed to read PATCHES directory")
}
for _, f := range patchesLs {
// Skip directories
if f.IsDir() {
continue
}
// Skip non-cfg files
if !strings.HasSuffix(f.Name(), ".cfg") {
continue
}
// Open the file
file, err := wt.Filesystem.Open(filepath.Join("PATCHES", f.Name()))
if err != nil {
return errors.Wrap(err, "failed to open file")
}
// Process the file
directivesBytes, err := io.ReadAll(file)
if err != nil {
return errors.Wrap(err, "failed to read file")
}
var cfg srpmprocpb.Cfg
err = prototext.Unmarshal(directivesBytes, &cfg)
if err != nil {
return errors.Wrap(err, "failed to unmarshal directives")
}
errs := directives.Apply(&cfg, pd, md, wt, wt)
// If there are errors, then we should return a reduced error
if len(errs) > 0 {
var retErr error
for _, err := range errs {
retErr = errors.Wrap(retErr, err.Error())
}
return retErr
}
}
// Add sources to ignore to lookasideBlobs
for _, source := range md.SourcesToIgnore {
// Get the hash of the source
hash, err := func() (string, error) {
hash := sha256.New()
file, err := wt.Filesystem.Open(source.Name)
if err != nil {
return "", errors.Wrap(err, "failed to open file")
}
defer file.Close()
_, err = io.Copy(hash, file)
if err != nil {
return "", errors.Wrap(err, "failed to copy file")
}
return hex.EncodeToString(hash.Sum(nil)), nil
}()
if err != nil {
return err
}
s.lookasideBlobs[source.Name] = hash
}
// Re-write the metadata file
err = s.writeMetadataFile(wt.Filesystem)
if err != nil {
return errors.Wrap(err, "failed to write metadata file")
}
return nil
}
// Import imports the SRPM into the target repository.
func (s *State) Import(opts *git.CloneOptions, storer storage2.Storer, targetFS billy.Filesystem, lookaside storage.Storage, osRelease string) (*object.Commit, error) {
// Get the target repository.

View File

@ -0,0 +1,34 @@
package srpm_import
import (
"github.com/rocky-linux/srpmproc/pkg/data"
"github.com/rocky-linux/srpmproc/pkg/misc"
"go.resf.org/peridot/base/go/storage"
"strings"
)
type srpmprocBlobCompat struct {
storage.Storage
}
func (s *srpmprocBlobCompat) Write(path string, content []byte) error {
_, err := s.PutBytes(path, content)
return err
}
func (s *srpmprocBlobCompat) Read(path string) ([]byte, error) {
return s.Get(path)
}
type srpmprocImportModeCompat struct {
data.ImportMode
}
func (s *srpmprocImportModeCompat) ImportName(pd *data.ProcessData, md *data.ModeData) string {
if misc.GetTagImportRegex(pd).MatchString(md.TagBranch) {
match := misc.GetTagImportRegex(pd).FindStringSubmatch(md.TagBranch)
return match[3]
}
return strings.Replace(strings.TrimPrefix(md.TagBranch, "refs/heads/"), "%", "_", -1)
}

View File

@ -23,6 +23,99 @@ import (
var w Worker
// processRPMPostHold is a part of the ProcessRPM workflow.
// This part executes the import part, and retries if it fails.
// After the first failure, the workflow is put on hold.
// If the workflow is put on hold, the workflow can be rescued by an admin.
func processRPMPostHold(ctx workflow.Context, entry *mothershippb.Entry, args *mothershippb.ProcessRPMArgs) (*mothershippb.ProcessRPMResponse, error) {
// If resource exists, then we can start the import.
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
// We'll wait up to 5 minutes for the import to finish.
// Most imports are fast, but some packages are very large.
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 1,
},
})
var importRpmRes mothershippb.ImportRPMResponse
err := workflow.ExecuteActivity(ctx, w.ImportRPM, args.Request.RpmUri, args.Request.Checksum, args.Request.OsRelease).Get(ctx, &importRpmRes)
if err != nil {
// If the import fails, we'll put the workflow on hold.
// If the workflow is put on hold, an admin can rescue the workflow.
var err error
signalChan := workflow.GetSignalChannel(ctx, "rescue")
workflow.GetLogger(ctx).Info("Import failed, putting workflow on hold")
selector := workflow.NewSelector(ctx)
selector.AddReceive(ctx.Done(), func(c workflow.ReceiveChannel, more bool) {
err = ctx.Err()
})
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
})
// Set state to on hold
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 25 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 0,
},
})
err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, nil).Get(ctx, nil)
if err != nil {
return nil, err
}
// Wait until a rescue signal is received. Otherwise, an admin can also
// cancel the workflow.
selector.Select(ctx)
// Check if workflow was cancelled.
if err != nil {
ctx, cancel := workflow.NewDisconnectedContext(ctx)
defer cancel()
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 25 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 0,
},
})
_ = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_CANCELLED, nil).Get(ctx, nil)
return nil, err
}
// Set the entry state to archiving
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 25 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 0,
},
})
err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVING, nil).Get(ctx, nil)
if err != nil {
return nil, err
}
// If the workflow was not cancelled, then we can retry the import.
return processRPMPostHold(ctx, entry, args)
}
// If the import succeeds, then we can update the entry state.
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 25 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 0,
},
})
err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVED, &importRpmRes).Get(ctx, entry)
if err != nil {
return nil, err
}
return &mothershippb.ProcessRPMResponse{
Entry: entry,
}, nil
}
// ProcessRPMWorkflow processes an SRPM.
// Usually a client worker will first initiate an upload to the storage backend,
// then send a request to the Server `SubmitEntry` method (or send a request
@ -47,32 +140,55 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs)
return nil, err
}
// If resource exists, then we can start the import.
// Set worker last check in time
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
// We'll wait up to 5 minutes for the import to finish.
// Most imports are fast, but some packages are very large.
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 0,
},
StartToCloseTimeout: 25 * time.Second,
})
var importRpmRes mothershippb.ImportRPMResponse
err = workflow.ExecuteActivity(ctx, w.ImportRPM, args.Request.RpmUri, args.Request.Checksum, args.Request.OsRelease).Get(ctx, &importRpmRes)
err = workflow.ExecuteActivity(ctx, w.SetWorkerLastCheckinTime, args.InternalRequest.WorkerId).Get(ctx, nil)
if err != nil {
return nil, err
}
// Now the import has reached the Git forge. Let's create an entry.
// Create an entry, if the import fails, we'll still have an entry.
// If it succeeds, we'll update the entry state.
// If it fails we can set the workflow on hold and if the patches are updated
// an admin can signal and "rescue" the workflow.
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 25 * time.Second,
})
var entry mothershippb.Entry
err = workflow.ExecuteActivity(ctx, w.CreateEntry, args, &importRpmRes).Get(ctx, &entry)
err = workflow.ExecuteActivity(ctx, w.CreateEntry, args).Get(ctx, &entry)
if err != nil {
return nil, err
}
return &mothershippb.ProcessRPMResponse{
Entry: &entry,
}, nil
// On defer, if the workflow is not completed, then we'll set the entry state
// to cancelled.
defer func() {
if entry.State == mothershippb.Entry_ARCHIVED {
return
}
ctx, cancel := workflow.NewDisconnectedContext(ctx)
defer cancel()
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 25 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 0,
},
})
_ = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_CANCELLED, nil).Get(ctx, nil)
}()
// Set the entry name to the RPM NVR
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 45 * time.Second,
})
err = workflow.ExecuteActivity(ctx, w.SetEntryIDFromRPM, entry.Name, args.Request.RpmUri, args.Request.Checksum).Get(ctx, nil)
if err != nil {
return nil, err
}
// Process the RPM.
return processRPMPostHold(ctx, &entry, args)
}