From 5f01fc83f1c588cf1c110b906439223f2a6efcbe Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Sun, 3 Sep 2023 07:47:29 +0200 Subject: [PATCH] Start working on entry retraction --- tools/mothership/admin/rpc/BUILD | 12 ++ tools/mothership/admin/rpc/main_test.go | 2 +- tools/mothership/admin/rpc/operation.go | 144 +++++++++++++ tools/mothership/admin/rpc/retract.go | 39 ++++ tools/mothership/admin/rpc/rpc.go | 5 + tools/mothership/admin/ui/GetEntry.tsx | 25 +++ .../cmd/mship_worker_server/main.go | 1 + tools/mothership/migrations/migrations.go | 2 +- tools/mothership/proto/admin/v1/BUILD | 2 + .../proto/admin/v1/mship_admin.proto | 37 ++++ tools/mothership/rpc/worker.go | 11 + tools/mothership/worker_server/BUILD | 45 +++- .../mothership/worker_server/retract_entry.go | 200 ++++++++++++++++++ tools/mothership/worker_server/workflows.go | 40 +++- 14 files changed, 557 insertions(+), 8 deletions(-) create mode 100644 tools/mothership/admin/rpc/operation.go create mode 100644 tools/mothership/admin/rpc/retract.go create mode 100644 tools/mothership/worker_server/retract_entry.go diff --git a/tools/mothership/admin/rpc/BUILD b/tools/mothership/admin/rpc/BUILD index 85663c6a..a6c0a42a 100644 --- a/tools/mothership/admin/rpc/BUILD +++ b/tools/mothership/admin/rpc/BUILD @@ -17,7 +17,9 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "rpc", srcs = [ + "operation.go", "rescue.go", + "retract.go", "rpc.go", "worker.go", ], @@ -25,16 +27,25 @@ go_library( visibility = ["//visibility:public"], deps = [ "//base/go", + "//third_party/googleapis/google/longrunning:longrunning_go_proto", "//tools/mothership/db", "//tools/mothership/proto/admin/v1:pb", "//tools/mothership/proto/v1:pb", + "//tools/mothership/worker_server", "//vendor/go.ciq.dev/pika", + "//vendor/go.temporal.io/api/enums/v1:enums", + "//vendor/go.temporal.io/api/serviceerror", + "//vendor/go.temporal.io/api/workflowservice/v1:workflowservice", "//vendor/go.temporal.io/sdk/client", + "@go_googleapis//google/rpc:code_go_proto", "@go_googleapis//google/rpc:errdetails_go_proto", + "@go_googleapis//google/rpc:status_go_proto", "@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", + "@org_golang_google_protobuf//types/known/anypb", "@org_golang_google_protobuf//types/known/emptypb", + "@org_golang_google_protobuf//types/known/timestamppb", ], ) @@ -55,6 +66,7 @@ go_test( "//vendor/github.com/testcontainers/testcontainers-go", "//vendor/github.com/testcontainers/testcontainers-go/modules/postgres", "//vendor/github.com/testcontainers/testcontainers-go/wait", + "//vendor/go.temporal.io/sdk/client", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//metadata", "@org_golang_google_grpc//status", diff --git a/tools/mothership/admin/rpc/main_test.go b/tools/mothership/admin/rpc/main_test.go index 56540f22..cfb61206 100644 --- a/tools/mothership/admin/rpc/main_test.go +++ b/tools/mothership/admin/rpc/main_test.go @@ -45,7 +45,7 @@ func TestMain(m *testing.M) { } defer os.RemoveAll(dir) - scripts, err := base.EmbedFSToOSFS(dir, migrations.UpSQLs, ".") + scripts, err := base.EmbedFSToOSFS(dir, mothership_migrations.UpSQLs, ".") if err != nil { panic(err) } diff --git a/tools/mothership/admin/rpc/operation.go b/tools/mothership/admin/rpc/operation.go new file mode 100644 index 00000000..18b165d6 --- /dev/null +++ b/tools/mothership/admin/rpc/operation.go @@ -0,0 +1,144 @@ +// Copyright 2023 Peridot Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mothershipadmin_rpc + +import ( + "context" + base "go.resf.org/peridot/base/go" + mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb" + mothershippb "go.resf.org/peridot/tools/mothership/pb" + v11 "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" + "google.golang.org/genproto/googleapis/longrunning" + rpccode "google.golang.org/genproto/googleapis/rpc/code" + rpcstatus "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func (s *Server) describeWorkflowToOperation(ctx context.Context, res *workflowservice.DescribeWorkflowExecutionResponse) (*longrunning.Operation, error) { + if res.WorkflowExecutionInfo == nil { + return nil, status.Error(codes.NotFound, "workflow not found") + } + if res.WorkflowExecutionInfo.Execution == nil { + return nil, status.Error(codes.NotFound, "workflow not found") + } + + op := &longrunning.Operation{ + Name: res.WorkflowExecutionInfo.Execution.WorkflowId, + } + + // If the workflow is not running, we can mark the operation as done + if res.WorkflowExecutionInfo.Status != v11.WORKFLOW_EXECUTION_STATUS_RUNNING { + op.Done = true + } + + // Add metadata + rpmMetadata := &mshipadminpb.RetractEntryMetadata{ + StartTime: nil, + EndTime: nil, + } + st := res.WorkflowExecutionInfo.GetStartTime() + if st != nil { + rpmMetadata.StartTime = timestamppb.New(*st) + } + + et := res.WorkflowExecutionInfo.GetCloseTime() + if et != nil { + rpmMetadata.EndTime = timestamppb.New(*et) + } + + rpmMetadataAny, err := anypb.New(rpmMetadata) + if err != nil { + return op, nil + } + op.Metadata = rpmMetadataAny + + // If completed, add result + // If failed, add error + if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_COMPLETED { + // Complete, we need to get the result using GetWorkflow + run := s.temporal.GetWorkflow(ctx, op.Name, "") + + var res mothershippb.ProcessRPMResponse + if err := run.Get(ctx, &res); err != nil { + return nil, err + } + + resAny, err := anypb.New(&res) + if err != nil { + return nil, err + } + op.Result = &longrunning.Operation_Response{Response: resAny} + } else if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_FAILED { + // Failed, we need to get the error using GetWorkflow + run := s.temporal.GetWorkflow(ctx, op.Name, "") + err := run.Get(ctx, nil) + // No error so return with a generic error + if err == nil { + op.Result = &longrunning.Operation_Error{ + Error: &rpcstatus.Status{ + Code: int32(rpccode.Code_INTERNAL), + Message: "workflow failed", + }, + } + return op, nil + } + + // Error, so return with the error + op.Result = &longrunning.Operation_Error{ + Error: &rpcstatus.Status{ + Code: int32(rpccode.Code_FAILED_PRECONDITION), + Message: err.Error(), + }, + } + } else if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_CANCELED { + // Error, so return with the error + op.Result = &longrunning.Operation_Error{ + Error: &rpcstatus.Status{ + Code: int32(rpccode.Code_CANCELLED), + Message: "workflow canceled", + }, + } + } + + return op, nil +} + +func (s *Server) getOperation(ctx context.Context, name string) (*longrunning.Operation, error) { + res, err := s.temporal.DescribeWorkflowExecution(ctx, name, "") + if err != nil { + if _, ok := err.(*serviceerror.NotFound); ok { + return nil, status.Error(codes.NotFound, "workflow not found") + } + + // Log error, but user doesn't need to know about it + base.LogErrorf("failed to describe workflow: %v", err) + return &longrunning.Operation{ + Name: name, + }, nil + } + + return s.describeWorkflowToOperation(ctx, res) +} + +func (s *Server) GetOperation(ctx context.Context, req *longrunning.GetOperationRequest) (*longrunning.Operation, error) { + // Get from Temporal. We don't care about long term storage, so we don't + // need to store the operation in the database. + return s.getOperation(ctx, req.Name) +} diff --git a/tools/mothership/admin/rpc/retract.go b/tools/mothership/admin/rpc/retract.go new file mode 100644 index 00000000..91c4fb09 --- /dev/null +++ b/tools/mothership/admin/rpc/retract.go @@ -0,0 +1,39 @@ +package mothershipadmin_rpc + +import ( + "context" + base "go.resf.org/peridot/base/go" + mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb" + mothership_worker_server "go.resf.org/peridot/tools/mothership/worker_server" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/client" + "google.golang.org/genproto/googleapis/longrunning" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "strings" +) + +func (s *Server) RetractEntry(ctx context.Context, req *mshipadminpb.RetractEntryRequest) (*longrunning.Operation, error) { + startWorkflowOpts := client.StartWorkflowOptions{ + ID: "operations/retract/" + req.Name, + WorkflowExecutionErrorWhenAlreadyStarted: true, + WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, + } + + // Submit to Temporal + run, err := s.temporal.ExecuteWorkflow( + context.Background(), + startWorkflowOpts, + mothership_worker_server.RetractEntryWorkflow, + req.Name, + ) + if err != nil { + if strings.Contains(err.Error(), "is already running") { + return nil, status.Error(codes.AlreadyExists, "entry is already running") + } + base.LogErrorf("failed to start workflow: %v", err) + return nil, status.Error(codes.Internal, "failed to start workflow") + } + + return s.getOperation(ctx, run.GetID()) +} diff --git a/tools/mothership/admin/rpc/rpc.go b/tools/mothership/admin/rpc/rpc.go index e8fc6b5f..a557fa1f 100644 --- a/tools/mothership/admin/rpc/rpc.go +++ b/tools/mothership/admin/rpc/rpc.go @@ -18,12 +18,15 @@ import ( base "go.resf.org/peridot/base/go" mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb" "go.temporal.io/sdk/client" + "google.golang.org/genproto/googleapis/longrunning" "google.golang.org/grpc" ) type Server struct { base.GRPCServer + mshipadminpb.UnimplementedMshipAdminServer + longrunning.UnimplementedOperationsServer db *base.DB temporal client.Client @@ -50,9 +53,11 @@ func NewServer(db *base.DB, temporalClient client.Client, oidcInterceptorDetails func (s *Server) Start() error { s.RegisterService(func(server *grpc.Server) { + longrunning.RegisterOperationsServer(server, s) mshipadminpb.RegisterMshipAdminServer(server, s) }) if err := s.GatewayEndpoints( + longrunning.RegisterOperationsHandler, mshipadminpb.RegisterMshipAdminHandler, ); err != nil { return err diff --git a/tools/mothership/admin/ui/GetEntry.tsx b/tools/mothership/admin/ui/GetEntry.tsx index f9cc18e0..2015068c 100644 --- a/tools/mothership/admin/ui/GetEntry.tsx +++ b/tools/mothership/admin/ui/GetEntry.tsx @@ -68,6 +68,21 @@ export const GetEntry = () => { window.location.reload(); }; + // Retract the entry (call API) + const retractEntry = async () => { + const [res, err] = await reqap( + mshipAdminApi.retractEntry({ + name: `entries/${params.name}`, + }), + ); + + if (err) { + return; + } + + window.location.reload(); + }; + return ( { Rescue )} + {resource && resource.state == EntryState.Archived && ( + + )} diff --git a/tools/mothership/cmd/mship_worker_server/main.go b/tools/mothership/cmd/mship_worker_server/main.go index 262b69e7..697f9441 100644 --- a/tools/mothership/cmd/mship_worker_server/main.go +++ b/tools/mothership/cmd/mship_worker_server/main.go @@ -99,6 +99,7 @@ func run(ctx *cli.Context) error { // Register workflows w.RegisterWorkflow(mothership_worker_server.ProcessRPMWorkflow) + w.RegisterWorkflow(mothership_worker_server.RetractEntryWorkflow) // Register activities w.RegisterActivity(workerServer) diff --git a/tools/mothership/migrations/migrations.go b/tools/mothership/migrations/migrations.go index 507c3ab9..0b6da8ee 100644 --- a/tools/mothership/migrations/migrations.go +++ b/tools/mothership/migrations/migrations.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package migrations +package mothership_migrations import "embed" diff --git a/tools/mothership/proto/admin/v1/BUILD b/tools/mothership/proto/admin/v1/BUILD index a1b8714b..e3445409 100644 --- a/tools/mothership/proto/admin/v1/BUILD +++ b/tools/mothership/proto/admin/v1/BUILD @@ -29,6 +29,7 @@ proto_library( "@com_google_protobuf//:empty_proto", "@com_google_protobuf//:timestamp_proto", "@go_googleapis//google/api:annotations_proto", + "@go_googleapis//google/longrunning:longrunning_proto", "@googleapis//google/api:annotations_proto", ], ) @@ -49,6 +50,7 @@ go_proto_library( proto = ":mshipadminpb_proto", visibility = ["//visibility:public"], deps = [ + "//third_party/googleapis/google/longrunning:longrunning_go_proto", "@go_googleapis//google/api:annotations_go_proto", "@org_golang_google_genproto//googleapis/api/annotations", ], diff --git a/tools/mothership/proto/admin/v1/mship_admin.proto b/tools/mothership/proto/admin/v1/mship_admin.proto index 9d0f417e..b81d907a 100644 --- a/tools/mothership/proto/admin/v1/mship_admin.proto +++ b/tools/mothership/proto/admin/v1/mship_admin.proto @@ -19,7 +19,9 @@ package peridot.tools.mothership.admin.v1; import "google/api/annotations.proto"; import "google/api/client.proto"; import "google/api/field_behavior.proto"; +import "google/longrunning/operations.proto"; import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; import "tools/mothership/proto/admin/v1/worker.proto"; option java_multiple_files = true; @@ -73,6 +75,20 @@ service MshipAdmin { }; option (google.api.method_signature) = "name"; } + + // Retract the entry + // To be able to retract an entry, the entry must be in the `ARCHIVED` state. + // This will allow an NVR to be re-imported. + rpc RetractEntry(RetractEntryRequest) returns (google.longrunning.Operation) { + option (google.api.http) = { + post: "/v1/{name=entries/*}:retract" + }; + option (google.api.method_signature) = "name"; + option (google.longrunning.operation_info) = { + response_type: "RetractEntryResponse" + metadata_type: "RetractEntryMetadata" + }; + } } // GetWorkerRequest is the request message for GetWorker. @@ -134,3 +150,24 @@ message RescueEntryImportRequest { // Required. The name of the entry to rescue. string name = 1 [(google.api.field_behavior) = REQUIRED]; } + +// RetractEntryRequest is the request message for RetractEntry. +message RetractEntryRequest { + // Required. The name of the entry to retract. + string name = 1 [(google.api.field_behavior) = REQUIRED]; +} + +// RetractEntryResponse is the response message for RetractEntry. +message RetractEntryResponse { + // The name of the entry that was retracted. + string name = 1; +} + +// RetractEntryMetadata is the metadata message for RetractEntry. +message RetractEntryMetadata { + // The time at which the workflow started + google.protobuf.Timestamp start_time = 1; + + // The time at which the workflow finished + google.protobuf.Timestamp end_time = 2; +} diff --git a/tools/mothership/rpc/worker.go b/tools/mothership/rpc/worker.go index 78f5cb1f..9cff3875 100644 --- a/tools/mothership/rpc/worker.go +++ b/tools/mothership/rpc/worker.go @@ -67,6 +67,17 @@ func (s *Server) SubmitEntry(ctx context.Context, req *mothershippb.SubmitEntryR return nil, err } + // Now make sure the entry doesn't already exist in the ARCHIVED state. + // If it does, return an error. It should be retracted first. + entry, err := base.Q[mothership_db.Entry](s.db).F("sha256_sum", req.ProcessRpmRequest.Checksum).GetOrNil() + if err != nil { + base.LogErrorf("failed to get entry: %v", err) + return nil, status.Error(codes.Internal, "failed to get entry") + } + if entry != nil && entry.State == mothershippb.Entry_ARCHIVED { + return nil, status.Error(codes.AlreadyExists, "entry already exists, you must retract the entry before submitting again") + } + startWorkflowOpts := client.StartWorkflowOptions{ ID: "operations/" + req.ProcessRpmRequest.Checksum, WorkflowExecutionErrorWhenAlreadyStarted: true, diff --git a/tools/mothership/worker_server/BUILD b/tools/mothership/worker_server/BUILD index 42ac310f..7d5de840 100644 --- a/tools/mothership/worker_server/BUILD +++ b/tools/mothership/worker_server/BUILD @@ -12,13 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "worker_server", srcs = [ "entry.go", "process_rpm.go", + "retract_entry.go", "worker.go", "workflows.go", ], @@ -28,16 +29,58 @@ go_library( "//base/go", "//base/go/storage", "//tools/mothership/db", + "//tools/mothership/proto/admin/v1:pb", "//tools/mothership/proto/v1:pb", "//tools/mothership/worker_server/forge", "//tools/mothership/worker_server/srpm_import", + "//vendor/github.com/go-git/go-billy/v5:go-billy", "//vendor/github.com/go-git/go-billy/v5/memfs", "//vendor/github.com/go-git/go-git/v5:go-git", + "//vendor/github.com/go-git/go-git/v5/config", + "//vendor/github.com/go-git/go-git/v5/plumbing", "//vendor/github.com/go-git/go-git/v5/storage/memory", "//vendor/github.com/pkg/errors", "//vendor/github.com/sassoftware/go-rpmutils", "//vendor/go.temporal.io/sdk/temporal", "//vendor/go.temporal.io/sdk/workflow", "//vendor/golang.org/x/crypto/openpgp", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + ], +) + +go_test( + name = "worker_server_test", + size = "small", + srcs = [ + "entry_test.go", + "forge_test.go", + "main_test.go", + "process_rpm_test.go", + "retract_entry_test.go", + "worker_test.go", + "workflows_test.go", + ], + data = glob(["testdata/**"]), + embed = [":worker_server"], + embedsrcs = ["testdata/RPM-GPG-KEY-Rocky-8"], + deps = [ + "//base/go", + "//base/go/storage/memory", + "//tools/mothership/db", + "//tools/mothership/migrations", + "//tools/mothership/proto/v1:pb", + "//tools/mothership/worker_server/forge", + "//vendor/github.com/go-git/go-billy/v5/osfs", + "//vendor/github.com/go-git/go-git/v5/plumbing/transport/http", + "//vendor/github.com/stretchr/testify/mock", + "//vendor/github.com/stretchr/testify/require", + "//vendor/github.com/stretchr/testify/suite", + "//vendor/github.com/testcontainers/testcontainers-go", + "//vendor/github.com/testcontainers/testcontainers-go/modules/postgres", + "//vendor/github.com/testcontainers/testcontainers-go/wait", + "//vendor/go.temporal.io/sdk/log", + "//vendor/go.temporal.io/sdk/testsuite", + "//vendor/golang.org/x/crypto/openpgp", ], ) diff --git a/tools/mothership/worker_server/retract_entry.go b/tools/mothership/worker_server/retract_entry.go new file mode 100644 index 00000000..baf663d1 --- /dev/null +++ b/tools/mothership/worker_server/retract_entry.go @@ -0,0 +1,200 @@ +package mothership_worker_server + +import ( + "github.com/go-git/go-billy/v5" + "github.com/go-git/go-billy/v5/memfs" + "github.com/go-git/go-git/v5" + "github.com/go-git/go-git/v5/config" + "github.com/go-git/go-git/v5/plumbing" + "github.com/go-git/go-git/v5/storage/memory" + base "go.resf.org/peridot/base/go" + mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb" + mothership_db "go.resf.org/peridot/tools/mothership/db" + "go.temporal.io/sdk/temporal" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "io" + "os" + "regexp" +) + +var pkgNameRegexp = regexp.MustCompile(`^([a-zA-Z0-9\-]+)-\d+.*$`) + +// getRepo gets a git repository from a remote +// It clones into an in-memory filesystem +func getRepo(remote string) (*git.Repository, error) { + // Just use in memory storage for all repos + storer := memory.NewStorage() + fs := memfs.New() + repo, err := git.Init(storer, fs) + if err != nil { + return nil, err + } + + // Add a new remote + refspec := config.RefSpec("refs/heads/*:refs/remotes/origin/*") + _, err = repo.CreateRemote(&config.RemoteConfig{ + Name: "origin", + URLs: []string{remote}, + Fetch: []config.RefSpec{refspec}, + }) + if err != nil { + return nil, err + } + + // Fetch all the refs from the remote + err = repo.Fetch(&git.FetchOptions{ + RemoteName: "origin", + }) + if err != nil { + return nil, err + } + + return repo, nil +} + +// clonePathToFS clones a path from one filesystem to another +func clonePathToFS(fromFS billy.Filesystem, toFS billy.Filesystem, rootPath string) error { + // check if root directory exists + _, err := fromFS.Stat(rootPath) + if err != nil { + // we don't care if the directory doesn't exist + if os.IsNotExist(err) { + return nil + } + + return err + } + + // read the root directory + rootDir, err := fromFS.ReadDir(rootPath) + if err != nil { + return err + } + + // iterate over the files + for _, file := range rootDir { + // get the file path + filePath := rootPath + "/" + file.Name() + + // check if the file is a directory + if file.IsDir() { + // create the directory in the toFS + err = toFS.MkdirAll(filePath, 0755) + if err != nil { + return err + } + + // recursively call this function + err = clonePathToFS(fromFS, toFS, filePath) + if err != nil { + return err + } + } else { + // open the file + f, err := fromFS.OpenFile(filePath, os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return err + } + defer f.Close() + + // create the file in the toFS + toFile, err := toFS.Create(filePath) + if err != nil { + return err + } + defer toFile.Close() + + // copy the file contents + _, err = io.Copy(toFile, f) + if err != nil { + return err + } + } + } + + return nil +} + +// clonePatchesToTemporaryFS clones the PATCHES directory to a temporary filesystem +// PATCHES directory is the only directory that should survive a retraction +func clonePatchesToTemporaryFS(currentFS billy.Filesystem) (billy.Filesystem, error) { + // create a new in-memory filesystem + fs := memfs.New() + + // clone the current filesystem to the new filesystem + err := clonePathToFS(currentFS, fs, "PATCHES") + if err != nil { + return nil, err + } + + return fs, nil +} + +func resetRepoToPoint(repo *git.Repository, commit string) error { + wt, err := repo.Worktree() + if err != nil { + return err + } + + // reset the repo + err = wt.Reset(&git.ResetOptions{ + Commit: plumbing.NewHash(commit), + Mode: git.HardReset, + }) + if err != nil { + return err + } + + // clean the repo + return wt.Clean(&git.CleanOptions{ + Dir: true, + }) +} + +func (w *Worker) RetractEntry(name string) (*mshipadminpb.RetractEntryResponse, error) { + entry, err := base.Q[mothership_db.Entry](w.db).F("name", name).GetOrNil() + if err != nil { + base.LogErrorf("failed to get entry: %v", err) + return nil, status.Error(codes.Internal, "failed to get entry") + } + + if entry == nil { + return nil, temporal.NewNonRetryableApplicationError( + "entry not found", + "entryNotFound", + nil, + ) + } + + // Get the repo + remote := w.forge.GetRemote(entry.PackageName) + repo, err := getRepo(remote) + if err != nil { + base.LogErrorf("failed to get repo: %v", err) + return nil, status.Error(codes.Internal, "failed to get repo") + } + + err = repo.DeleteObject(plumbing.NewHash(entry.CommitHash)) + if err != nil { + base.LogErrorf("failed to eject commit: %v", err) + return nil, status.Error(codes.Internal, "failed to eject commit") + } + + // If there's a tag, delete it + _ = repo.DeleteTag(entry.CommitTag) + + // Push the changes + err = repo.Push(&git.PushOptions{ + RemoteName: "origin", + Force: true, + }) + if err != nil { + base.LogErrorf("failed to push changes: %v", err) + return nil, status.Error(codes.Internal, "failed to push changes") + } + + return &mshipadminpb.RetractEntryResponse{ + Name: entry.Name, + }, nil +} diff --git a/tools/mothership/worker_server/workflows.go b/tools/mothership/worker_server/workflows.go index 61464399..fa1402e8 100644 --- a/tools/mothership/worker_server/workflows.go +++ b/tools/mothership/worker_server/workflows.go @@ -15,6 +15,7 @@ package mothership_worker_server import ( + mshipadminpb "go.resf.org/peridot/tools/mothership/admin/pb" mothershippb "go.resf.org/peridot/tools/mothership/pb" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" @@ -51,6 +52,7 @@ func processRPMPostHold(ctx workflow.Context, entry *mothershippb.Entry, args *m }) selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) { c.Receive(ctx, nil) + err = nil }) // Set state to on hold @@ -60,7 +62,7 @@ func processRPMPostHold(ctx workflow.Context, entry *mothershippb.Entry, args *m MaximumAttempts: 0, }, }) - err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, nil).Get(ctx, nil) + err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, nil).Get(ctx, entry) if err != nil { return nil, err } @@ -79,7 +81,7 @@ func processRPMPostHold(ctx workflow.Context, entry *mothershippb.Entry, args *m MaximumAttempts: 0, }, }) - _ = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_CANCELLED, nil).Get(ctx, nil) + _ = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_CANCELLED, nil).Get(ctx, entry) return nil, err } @@ -90,7 +92,7 @@ func processRPMPostHold(ctx workflow.Context, entry *mothershippb.Entry, args *m MaximumAttempts: 0, }, }) - err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVING, nil).Get(ctx, nil) + err = workflow.ExecuteActivity(ctx, w.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVING, nil).Get(ctx, entry) if err != nil { return nil, err } @@ -165,7 +167,7 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs) // On defer, if the workflow is not completed, then we'll set the entry state // to failed. defer func() { - if entry.State == mothershippb.Entry_ARCHIVED { + if entry.State == mothershippb.Entry_ARCHIVED || entry.State == mothershippb.Entry_CANCELLED { return } @@ -184,7 +186,7 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 45 * time.Second, }) - err = workflow.ExecuteActivity(ctx, w.SetEntryIDFromRPM, entry.Name, args.Request.RpmUri, args.Request.Checksum).Get(ctx, nil) + err = workflow.ExecuteActivity(ctx, w.SetEntryIDFromRPM, entry.Name, args.Request.RpmUri, args.Request.Checksum).Get(ctx, &entry) if err != nil { return nil, err } @@ -192,3 +194,31 @@ func ProcessRPMWorkflow(ctx workflow.Context, args *mothershippb.ProcessRPMArgs) // Process the RPM. return processRPMPostHold(ctx, &entry, args) } + +// RetractEntryWorkflow retracts an entry. +// Should be used when an entry debranding is not considered fully complete. (Contains upstream trademarks for example) +// This will forcefully remove the commit from the git repository and set the entry state to RETRACTED. +// The same source (for the specific entry) can be re-imported by the client, either by calling DuplicateEntry or +// calling SubmitEntry with the same SRPM URI. +func RetractEntryWorkflow(ctx workflow.Context, name string) (*mshipadminpb.RetractEntryResponse, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 60 * time.Second, + }) + + var res mshipadminpb.RetractEntryResponse + err := workflow.ExecuteActivity(ctx, w.RetractEntry, name).Get(ctx, &res) + if err != nil { + return nil, err + } + + // Set the entry state to retracted + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 25 * time.Second, + }) + err = workflow.ExecuteActivity(ctx, w.SetEntryState, name, mothershippb.Entry_RETRACTED, nil).Get(ctx, nil) + if err != nil { + return nil, err + } + + return &res, nil +}