Start working on entry retraction

This commit is contained in:
Mustafa Gezen 2023-09-03 07:47:29 +02:00
parent 3bcc4223ff
commit 5f01fc83f1
Signed by: mustafa
GPG Key ID: DCDF010D946438C1
14 changed files with 557 additions and 8 deletions

View File

@ -17,7 +17,9 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "rpc",
srcs = [
"operation.go",
"rescue.go",
"retract.go",
"rpc.go",
"worker.go",
],
@ -25,16 +27,25 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//base/go",
"//third_party/googleapis/google/longrunning:longrunning_go_proto",
"//tools/mothership/db",
"//tools/mothership/proto/admin/v1:pb",
"//tools/mothership/proto/v1:pb",
"//tools/mothership/worker_server",
"//vendor/go.ciq.dev/pika",
"//vendor/go.temporal.io/api/enums/v1:enums",
"//vendor/go.temporal.io/api/serviceerror",
"//vendor/go.temporal.io/api/workflowservice/v1:workflowservice",
"//vendor/go.temporal.io/sdk/client",
"@go_googleapis//google/rpc:code_go_proto",
"@go_googleapis//google/rpc:errdetails_go_proto",
"@go_googleapis//google/rpc:status_go_proto",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/emptypb",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)
@ -55,6 +66,7 @@ go_test(
"//vendor/github.com/testcontainers/testcontainers-go",
"//vendor/github.com/testcontainers/testcontainers-go/modules/postgres",
"//vendor/github.com/testcontainers/testcontainers-go/wait",
"//vendor/go.temporal.io/sdk/client",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",

View File

@ -45,7 +45,7 @@ func TestMain(m *testing.M) {
}
defer os.RemoveAll(dir)
scripts, err := base.EmbedFSToOSFS(dir, migrations.UpSQLs, ".")
scripts, err := base.EmbedFSToOSFS(dir, mothership_migrations.UpSQLs, ".")
if err != nil {
panic(err)
}

View File

@ -0,0 +1,144 @@
// Copyright 2023 Peridot Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mothershipadmin_rpc
import (
"context"
base "go.resf.org/peridot/base/go"
mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
v11 "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/genproto/googleapis/longrunning"
rpccode "google.golang.org/genproto/googleapis/rpc/code"
rpcstatus "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
func (s *Server) describeWorkflowToOperation(ctx context.Context, res *workflowservice.DescribeWorkflowExecutionResponse) (*longrunning.Operation, error) {
if res.WorkflowExecutionInfo == nil {
return nil, status.Error(codes.NotFound, "workflow not found")
}
if res.WorkflowExecutionInfo.Execution == nil {
return nil, status.Error(codes.NotFound, "workflow not found")
}
op := &longrunning.Operation{
Name: res.WorkflowExecutionInfo.Execution.WorkflowId,
}
// If the workflow is not running, we can mark the operation as done
if res.WorkflowExecutionInfo.Status != v11.WORKFLOW_EXECUTION_STATUS_RUNNING {
op.Done = true
}
// Add metadata
rpmMetadata := &mshipadminpb.RetractEntryMetadata{
StartTime: nil,
EndTime: nil,
}
st := res.WorkflowExecutionInfo.GetStartTime()
if st != nil {
rpmMetadata.StartTime = timestamppb.New(*st)
}
et := res.WorkflowExecutionInfo.GetCloseTime()
if et != nil {
rpmMetadata.EndTime = timestamppb.New(*et)
}
rpmMetadataAny, err := anypb.New(rpmMetadata)
if err != nil {
return op, nil
}
op.Metadata = rpmMetadataAny
// If completed, add result
// If failed, add error
if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_COMPLETED {
// Complete, we need to get the result using GetWorkflow
run := s.temporal.GetWorkflow(ctx, op.Name, "")
var res mothershippb.ProcessRPMResponse
if err := run.Get(ctx, &res); err != nil {
return nil, err
}
resAny, err := anypb.New(&res)
if err != nil {
return nil, err
}
op.Result = &longrunning.Operation_Response{Response: resAny}
} else if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_FAILED {
// Failed, we need to get the error using GetWorkflow
run := s.temporal.GetWorkflow(ctx, op.Name, "")
err := run.Get(ctx, nil)
// No error so return with a generic error
if err == nil {
op.Result = &longrunning.Operation_Error{
Error: &rpcstatus.Status{
Code: int32(rpccode.Code_INTERNAL),
Message: "workflow failed",
},
}
return op, nil
}
// Error, so return with the error
op.Result = &longrunning.Operation_Error{
Error: &rpcstatus.Status{
Code: int32(rpccode.Code_FAILED_PRECONDITION),
Message: err.Error(),
},
}
} else if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_CANCELED {
// Error, so return with the error
op.Result = &longrunning.Operation_Error{
Error: &rpcstatus.Status{
Code: int32(rpccode.Code_CANCELLED),
Message: "workflow canceled",
},
}
}
return op, nil
}
func (s *Server) getOperation(ctx context.Context, name string) (*longrunning.Operation, error) {
res, err := s.temporal.DescribeWorkflowExecution(ctx, name, "")
if err != nil {
if _, ok := err.(*serviceerror.NotFound); ok {
return nil, status.Error(codes.NotFound, "workflow not found")
}
// Log error, but user doesn't need to know about it
base.LogErrorf("failed to describe workflow: %v", err)
return &longrunning.Operation{
Name: name,
}, nil
}
return s.describeWorkflowToOperation(ctx, res)
}
func (s *Server) GetOperation(ctx context.Context, req *longrunning.GetOperationRequest) (*longrunning.Operation, error) {
// Get from Temporal. We don't care about long term storage, so we don't
// need to store the operation in the database.
return s.getOperation(ctx, req.Name)
}

View File

@ -0,0 +1,39 @@
package mothershipadmin_rpc
import (
"context"
base "go.resf.org/peridot/base/go"
mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb"
mothership_worker_server "go.resf.org/peridot/tools/mothership/worker_server"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"strings"
)
func (s *Server) RetractEntry(ctx context.Context, req *mshipadminpb.RetractEntryRequest) (*longrunning.Operation, error) {
startWorkflowOpts := client.StartWorkflowOptions{
ID: "operations/retract/" + req.Name,
WorkflowExecutionErrorWhenAlreadyStarted: true,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
}
// Submit to Temporal
run, err := s.temporal.ExecuteWorkflow(
context.Background(),
startWorkflowOpts,
mothership_worker_server.RetractEntryWorkflow,
req.Name,
)
if err != nil {
if strings.Contains(err.Error(), "is already running") {
return nil, status.Error(codes.AlreadyExists, "entry is already running")
}
base.LogErrorf("failed to start workflow: %v", err)
return nil, status.Error(codes.Internal, "failed to start workflow")
}
return s.getOperation(ctx, run.GetID())
}

View File

@ -18,12 +18,15 @@ import (
base "go.resf.org/peridot/base/go"
mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb"
"go.temporal.io/sdk/client"
"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/grpc"
)
type Server struct {
base.GRPCServer
mshipadminpb.UnimplementedMshipAdminServer
longrunning.UnimplementedOperationsServer
db *base.DB
temporal client.Client
@ -50,9 +53,11 @@ func NewServer(db *base.DB, temporalClient client.Client, oidcInterceptorDetails
func (s *Server) Start() error {
s.RegisterService(func(server *grpc.Server) {
longrunning.RegisterOperationsServer(server, s)
mshipadminpb.RegisterMshipAdminServer(server, s)
})
if err := s.GatewayEndpoints(
longrunning.RegisterOperationsHandler,
mshipadminpb.RegisterMshipAdminHandler,
); err != nil {
return err

View File

@ -68,6 +68,21 @@ export const GetEntry = () => {
window.location.reload();
};
// Retract the entry (call API)
const retractEntry = async () => {
const [res, err] = await reqap(
mshipAdminApi.retractEntry({
name: `entries/${params.name}`,
}),
);
if (err) {
return;
}
window.location.reload();
};
return (
<Box>
<Box
@ -89,6 +104,16 @@ export const GetEntry = () => {
Rescue
</Button>
)}
{resource && resource.state == EntryState.Archived && (
<Button
sx={{ ml: 'auto', textAlign: 'right' }}
variant="outlined"
color="error"
onClick={retractEntry}
>
Retract
</Button>
)}
</Box>
<Divider />
<Box sx={{ p: 1.5 }}>

View File

@ -99,6 +99,7 @@ func run(ctx *cli.Context) error {
// Register workflows
w.RegisterWorkflow(mothership_worker_server.ProcessRPMWorkflow)
w.RegisterWorkflow(mothership_worker_server.RetractEntryWorkflow)
// Register activities
w.RegisterActivity(workerServer)

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package migrations
package mothership_migrations
import "embed"

View File

@ -29,6 +29,7 @@ proto_library(
"@com_google_protobuf//:empty_proto",
"@com_google_protobuf//:timestamp_proto",
"@go_googleapis//google/api:annotations_proto",
"@go_googleapis//google/longrunning:longrunning_proto",
"@googleapis//google/api:annotations_proto",
],
)
@ -49,6 +50,7 @@ go_proto_library(
proto = ":mshipadminpb_proto",
visibility = ["//visibility:public"],
deps = [
"//third_party/googleapis/google/longrunning:longrunning_go_proto",
"@go_googleapis//google/api:annotations_go_proto",
"@org_golang_google_genproto//googleapis/api/annotations",
],

View File

@ -19,7 +19,9 @@ package peridot.tools.mothership.admin.v1;
import "google/api/annotations.proto";
import "google/api/client.proto";
import "google/api/field_behavior.proto";
import "google/longrunning/operations.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "tools/mothership/proto/admin/v1/worker.proto";
option java_multiple_files = true;
@ -73,6 +75,20 @@ service MshipAdmin {
};
option (google.api.method_signature) = "name";
}
// Retract the entry
// To be able to retract an entry, the entry must be in the `ARCHIVED` state.
// This will allow an NVR to be re-imported.
rpc RetractEntry(RetractEntryRequest) returns (google.longrunning.Operation) {
option (google.api.http) = {
post: "/v1/{name=entries/*}:retract"
};
option (google.api.method_signature) = "name";
option (google.longrunning.operation_info) = {
response_type: "RetractEntryResponse"
metadata_type: "RetractEntryMetadata"
};
}
}
// GetWorkerRequest is the request message for GetWorker.
@ -134,3 +150,24 @@ message RescueEntryImportRequest {
// Required. The name of the entry to rescue.
string name = 1 [(google.api.field_behavior) = REQUIRED];
}
// RetractEntryRequest is the request message for RetractEntry.
message RetractEntryRequest {
// Required. The name of the entry to retract.
string name = 1 [(google.api.field_behavior) = REQUIRED];
}
// RetractEntryResponse is the response message for RetractEntry.
message RetractEntryResponse {
// The name of the entry that was retracted.
string name = 1;
}
// RetractEntryMetadata is the metadata message for RetractEntry.
message RetractEntryMetadata {
// The time at which the workflow started
google.protobuf.Timestamp start_time = 1;
// The time at which the workflow finished
google.protobuf.Timestamp end_time = 2;
}

View File

@ -67,6 +67,17 @@ func (s *Server) SubmitEntry(ctx context.Context, req *mothershippb.SubmitEntryR
return nil, err
}
// Now make sure the entry doesn't already exist in the ARCHIVED state.
// If it does, return an error. It should be retracted first.
entry, err := base.Q[mothership_db.Entry](s.db).F("sha256_sum", req.ProcessRpmRequest.Checksum).GetOrNil()
if err != nil {
base.LogErrorf("failed to get entry: %v", err)
return nil, status.Error(codes.Internal, "failed to get entry")
}
if entry != nil && entry.State == mothershippb.Entry_ARCHIVED {
return nil, status.Error(codes.AlreadyExists, "entry already exists, you must retract the entry before submitting again")
}
startWorkflowOpts := client.StartWorkflowOptions{
ID: "operations/" + req.ProcessRpmRequest.Checksum,
WorkflowExecutionErrorWhenAlreadyStarted: true,

View File

@ -12,13 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "worker_server",
srcs = [
"entry.go",
"process_rpm.go",
"retract_entry.go",
"worker.go",
"workflows.go",
],
@ -28,16 +29,58 @@ go_library(
"//base/go",
"//base/go/storage",
"//tools/mothership/db",
"//tools/mothership/proto/admin/v1:pb",
"//tools/mothership/proto/v1:pb",
"//tools/mothership/worker_server/forge",
"//tools/mothership/worker_server/srpm_import",
"//vendor/github.com/go-git/go-billy/v5:go-billy",
"//vendor/github.com/go-git/go-billy/v5/memfs",
"//vendor/github.com/go-git/go-git/v5:go-git",
"//vendor/github.com/go-git/go-git/v5/config",
"//vendor/github.com/go-git/go-git/v5/plumbing",
"//vendor/github.com/go-git/go-git/v5/storage/memory",
"//vendor/github.com/pkg/errors",
"//vendor/github.com/sassoftware/go-rpmutils",
"//vendor/go.temporal.io/sdk/temporal",
"//vendor/go.temporal.io/sdk/workflow",
"//vendor/golang.org/x/crypto/openpgp",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
)
go_test(
name = "worker_server_test",
size = "small",
srcs = [
"entry_test.go",
"forge_test.go",
"main_test.go",
"process_rpm_test.go",
"retract_entry_test.go",
"worker_test.go",
"workflows_test.go",
],
data = glob(["testdata/**"]),
embed = [":worker_server"],
embedsrcs = ["testdata/RPM-GPG-KEY-Rocky-8"],
deps = [
"//base/go",
"//base/go/storage/memory",
"//tools/mothership/db",
"//tools/mothership/migrations",
"//tools/mothership/proto/v1:pb",
"//tools/mothership/worker_server/forge",
"//vendor/github.com/go-git/go-billy/v5/osfs",
"//vendor/github.com/go-git/go-git/v5/plumbing/transport/http",
"//vendor/github.com/stretchr/testify/mock",
"//vendor/github.com/stretchr/testify/require",
"//vendor/github.com/stretchr/testify/suite",
"//vendor/github.com/testcontainers/testcontainers-go",
"//vendor/github.com/testcontainers/testcontainers-go/modules/postgres",
"//vendor/github.com/testcontainers/testcontainers-go/wait",
"//vendor/go.temporal.io/sdk/log",
"//vendor/go.temporal.io/sdk/testsuite",
"//vendor/golang.org/x/crypto/openpgp",
],
)

View File

@ -0,0 +1,200 @@
package mothership_worker_server
import (
"github.com/go-git/go-billy/v5"
"github.com/go-git/go-billy/v5/memfs"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/config"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/storage/memory"
base "go.resf.org/peridot/base/go"
mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb"
mothership_db "go.resf.org/peridot/tools/mothership/db"
"go.temporal.io/sdk/temporal"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"os"
"regexp"
)
var pkgNameRegexp = regexp.MustCompile(`^([a-zA-Z0-9\-]+)-\d+.*$`)
// getRepo gets a git repository from a remote
// It clones into an in-memory filesystem
func getRepo(remote string) (*git.Repository, error) {
// Just use in memory storage for all repos
storer := memory.NewStorage()
fs := memfs.New()
repo, err := git.Init(storer, fs)
if err != nil {
return nil, err
}
// Add a new remote
refspec := config.RefSpec("refs/heads/*:refs/remotes/origin/*")
_, err = repo.CreateRemote(&config.RemoteConfig{
Name: "origin",
URLs: []string{remote},
Fetch: []config.RefSpec{refspec},
})
if err != nil {
return nil, err
}
// Fetch all the refs from the remote
err = repo.Fetch(&git.FetchOptions{
RemoteName: "origin",
})
if err != nil {
return nil, err
}
return repo, nil
}
// clonePathToFS clones a path from one filesystem to another
func clonePathToFS(fromFS billy.Filesystem, toFS billy.Filesystem, rootPath string) error {
// check if root directory exists
_, err := fromFS.Stat(rootPath)
if err != nil {
// we don't care if the directory doesn't exist
if os.IsNotExist(err) {
return nil
}
return err
}
// read the root directory
rootDir, err := fromFS.ReadDir(rootPath)
if err != nil {
return err
}
// iterate over the files
for _, file := range rootDir {
// get the file path
filePath := rootPath + "/" + file.Name()
// check if the file is a directory
if file.IsDir() {
// create the directory in the toFS
err = toFS.MkdirAll(filePath, 0755)
if err != nil {
return err
}
// recursively call this function
err = clonePathToFS(fromFS, toFS, filePath)
if err != nil {
return err
}
} else {
// open the file
f, err := fromFS.OpenFile(filePath, os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return err
}
defer f.Close()
// create the file in the toFS
toFile, err := toFS.Create(filePath)
if err != nil {
return err
}
defer toFile.Close()
// copy the file contents
_, err = io.Copy(toFile, f)
if err != nil {
return err
}
}
}
return nil
}
// clonePatchesToTemporaryFS clones the PATCHES directory to a temporary filesystem
// PATCHES directory is the only directory that should survive a retraction
func clonePatchesToTemporaryFS(currentFS billy.Filesystem) (billy.Filesystem, error) {
// create a new in-memory filesystem
fs := memfs.New()
// clone the current filesystem to the new filesystem
err := clonePathToFS(currentFS, fs, "PATCHES")
if err != nil {
return nil, err
}
return fs, nil
}
func resetRepoToPoint(repo *git.Repository, commit string) error {
wt, err := repo.Worktree()
if err != nil {
return err
}
// reset the repo
err = wt.Reset(&git.ResetOptions{
Commit: plumbing.NewHash(commit),
Mode: git.HardReset,
})
if err != nil {
return err
}
// clean the repo
return wt.Clean(&git.CleanOptions{
Dir: true,
})
}
func (w *Worker) RetractEntry(name string) (*mshipadminpb.RetractEntryResponse, error) {
entry, err := base.Q[mothership_db.Entry](w.db).F("name", name).GetOrNil()
if err != nil {
base.LogErrorf("failed to get entry: %v", err)
return nil, status.Error(codes.Internal, "failed to get entry")
}
if entry == nil {
return nil, temporal.NewNonRetryableApplicationError(
"entry not found",
"entryNotFound",
nil,
)
}
// Get the repo
remote := w.forge.GetRemote(entry.PackageName)
repo, err := getRepo(remote)
if err != nil {
base.LogErrorf("failed to get repo: %v", err)
return nil, status.Error(codes.Internal, "failed to get repo")
}
err = repo.DeleteObject(plumbing.NewHash(entry.CommitHash))
if err != nil {
base.LogErrorf("failed to eject commit: %v", err)
return nil, status.Error(codes.Internal, "failed to eject commit")
}
// If there's a tag, delete it
_ = repo.DeleteTag(entry.CommitTag)
// Push the changes
err = repo.Push(&git.PushOptions{
RemoteName: "origin",
Force: true,
})
if err != nil {
base.LogErrorf("failed to push changes: %v", err)
return nil, status.Error(codes.Internal, "failed to push changes")
}
return &mshipadminpb.RetractEntryResponse{
Name: entry.Name,
}, nil
}

View File

@ -15,6 +15,7 @@
package mothership_worker_server
import (
mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
@ -51,6 +52,7 @@ func processRPMPostHold(ctx workflow.Context, entry *mothershippb.Entry, args *m
})
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
err = nil
})
// Set state to on hold
@ -60,7 +62,7 @@ func processRPMPostHold(ctx workflow.Context, entry *mothershippb.Entry, args *m
MaximumAttempts: 0,
},
})
err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, nil).Get(ctx, nil)
err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, nil).Get(ctx, entry)
if err != nil {
return nil, err
}
@ -79,7 +81,7 @@ func processRPMPostHold(ctx workflow.Context, entry *mothershippb.Entry, args *m
MaximumAttempts: 0,
},
})
_ = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_CANCELLED, nil).Get(ctx, nil)
_ = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_CANCELLED, nil).Get(ctx, entry)
return nil, err
}
@ -90,7 +92,7 @@ func processRPMPostHold(ctx workflow.Context, entry *mothershippb.Entry, args *m
MaximumAttempts: 0,
},
})
err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVING, nil).Get(ctx, nil)
err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVING, nil).Get(ctx, entry)
if err != nil {
return nil, err
}
@ -165,7 +167,7 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs)
// On defer, if the workflow is not completed, then we'll set the entry state
// to failed.
defer func() {
if entry.State == mothershippb.Entry_ARCHIVED {
if entry.State == mothershippb.Entry_ARCHIVED || entry.State == mothershippb.Entry_CANCELLED {
return
}
@ -184,7 +186,7 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs)
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 45 * time.Second,
})
err = workflow.ExecuteActivity(ctx, w.SetEntryIDFromRPM, entry.Name, args.Request.RpmUri, args.Request.Checksum).Get(ctx, nil)
err = workflow.ExecuteActivity(ctx, w.SetEntryIDFromRPM, entry.Name, args.Request.RpmUri, args.Request.Checksum).Get(ctx, &entry)
if err != nil {
return nil, err
}
@ -192,3 +194,31 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs)
// Process the RPM.
return processRPMPostHold(ctx, &entry, args)
}
// RetractEntryWorkflow retracts an entry.
// Should be used when an entry debranding is not considered fully complete. (Contains upstream trademarks for example)
// This will forcefully remove the commit from the git repository and set the entry state to RETRACTED.
// The same source (for the specific entry) can be re-imported by the client, either by calling DuplicateEntry or
// calling SubmitEntry with the same SRPM URI.
func RetractEntryWorkflow(ctx workflow.Context, name string) (*mshipadminpb.RetractEntryResponse, error) {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 60 * time.Second,
})
var res mshipadminpb.RetractEntryResponse
err := workflow.ExecuteActivity(ctx, w.RetractEntry, name).Get(ctx, &res)
if err != nil {
return nil, err
}
// Set the entry state to retracted
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 25 * time.Second,
})
err = workflow.ExecuteActivity(ctx, w.SetEntryState, name, mothershippb.Entry_RETRACTED, nil).Get(ctx, nil)
if err != nil {
return nil, err
}
return &res, nil
}