diff --git a/tools/mothership/worker_server/entry_test.go b/tools/mothership/worker_server/entry_test.go new file mode 100644 index 00000000..5b4acc4e --- /dev/null +++ b/tools/mothership/worker_server/entry_test.go @@ -0,0 +1,189 @@ +package mothership_worker_server + +import ( + "github.com/stretchr/testify/require" + mothership_db "go.resf.org/peridot/tools/mothership/db" + mothershippb "go.resf.org/peridot/tools/mothership/pb" + "testing" + "time" +) + +func TestWorker_CreateEntry(t *testing.T) { + require.Nil(t, q[mothership_db.Entry]().Delete()) + defer func() { + require.Nil(t, q[mothership_db.Entry]().Delete()) + }() + + args := &mothershippb.ProcessRPMArgs{ + Request: &mothershippb.ProcessRPMRequest{ + RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm", + OsRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Checksum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", + Repository: "BaseOS", + }, + InternalRequest: &mothershippb.ProcessRPMInternalRequest{ + WorkerId: "test-worker", + }, + } + entry, err := testW.CreateEntry(args) + require.Nil(t, err) + require.NotNil(t, entry) + require.Equal(t, "Rocky Linux release 8.8 (Green Obsidian)", entry.OsRelease) + require.Equal(t, "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", entry.Sha256Sum) + c, err := q[mothership_db.Entry]().F("name", entry.Name).Count() + require.Nil(t, err) + require.Equal(t, c, 1) +} + +func TestWorker_SetEntryIDFromRPM(t *testing.T) { + require.Nil(t, q[mothership_db.Entry]().Delete()) + defer func() { + require.Nil(t, q[mothership_db.Entry]().Delete()) + }() + + args := &mothershippb.ProcessRPMArgs{ + Request: &mothershippb.ProcessRPMRequest{ + RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm", + OsRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Checksum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", + Repository: "BaseOS", + }, + InternalRequest: &mothershippb.ProcessRPMInternalRequest{ + WorkerId: "test-worker", + }, + } + entry, err := testW.CreateEntry(args) + require.Nil(t, err) + require.NotNil(t, entry) + + entry, err = testW.SetEntryIDFromRPM(entry.Name, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum) + require.Nil(t, err) + require.NotNil(t, entry) + require.Equal(t, "efi-rpm-macros-3-3.el8.src", entry.EntryId) +} + +func TestWorker_SetEntryIDFromRPM_FailedToDownload(t *testing.T) { + require.Nil(t, q[mothership_db.Entry]().Delete()) + defer func() { + require.Nil(t, q[mothership_db.Entry]().Delete()) + }() + + args := &mothershippb.ProcessRPMArgs{ + Request: &mothershippb.ProcessRPMRequest{ + RpmUri: "memory://not-found.rpm", + OsRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Checksum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", + Repository: "BaseOS", + }, + InternalRequest: &mothershippb.ProcessRPMInternalRequest{ + WorkerId: "test-worker", + }, + } + entry, err := testW.CreateEntry(args) + require.Nil(t, err) + require.NotNil(t, entry) + + entry, err = testW.SetEntryIDFromRPM(entry.Name, "memory://not-found.rpm", entry.Sha256Sum) + require.NotNil(t, err) + require.Contains(t, err.Error(), "failed to download resource") +} + +func TestWorker_SetEntryState(t *testing.T) { + require.Nil(t, q[mothership_db.Entry]().Delete()) + defer func() { + require.Nil(t, q[mothership_db.Entry]().Delete()) + }() + + args := &mothershippb.ProcessRPMArgs{ + Request: &mothershippb.ProcessRPMRequest{ + RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm", + OsRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Checksum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", + Repository: "BaseOS", + }, + InternalRequest: &mothershippb.ProcessRPMInternalRequest{ + WorkerId: "test-worker", + }, + } + entry, err := testW.CreateEntry(args) + require.Nil(t, err) + require.NotNil(t, entry) + + importRpmRes := &mothershippb.ImportRPMResponse{ + CommitHash: "123", + CommitUri: "https://forge.resf.org/peridot/efi-rpm-macros/commit/123", + CommitBranch: "el-8.8", + CommitTag: "imports/el-8.8/efi-rpm-macros-3-3.el8", + Nevra: "efi-rpm-macros-0:3-3.el8.aarch64", + Pkg: "efi-rpm-macros", + } + entry, err = testW.SetEntryState(entry.Name, mothershippb.Entry_ARCHIVED, importRpmRes) + require.Nil(t, err) + require.NotNil(t, entry) + require.Equal(t, mothershippb.Entry_ARCHIVED, entry.State) + require.Equal(t, "123", entry.CommitHash) + require.Equal(t, "https://forge.resf.org/peridot/efi-rpm-macros/commit/123", entry.CommitUri) + require.Equal(t, "el-8.8", entry.CommitBranch) + require.Equal(t, "imports/el-8.8/efi-rpm-macros-3-3.el8", entry.CommitTag) + require.Equal(t, "efi-rpm-macros", entry.Pkg) +} + +func TestWorker_SetEntryState_NoRes(t *testing.T) { + require.Nil(t, q[mothership_db.Entry]().Delete()) + defer func() { + require.Nil(t, q[mothership_db.Entry]().Delete()) + }() + + args := &mothershippb.ProcessRPMArgs{ + Request: &mothershippb.ProcessRPMRequest{ + RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm", + OsRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Checksum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", + Repository: "BaseOS", + }, + InternalRequest: &mothershippb.ProcessRPMInternalRequest{ + WorkerId: "test-worker", + }, + } + entry, err := testW.CreateEntry(args) + require.Nil(t, err) + require.NotNil(t, entry) + + entry, err = testW.SetEntryState(entry.Name, mothershippb.Entry_ON_HOLD, nil) + require.Nil(t, err) + require.NotNil(t, entry) + require.Equal(t, mothershippb.Entry_ON_HOLD, entry.State) + require.Equal(t, "", entry.CommitHash) + require.Equal(t, "", entry.CommitUri) + require.Equal(t, "", entry.CommitBranch) + require.Equal(t, "", entry.CommitTag) + require.Equal(t, "", entry.Pkg) +} + +func TestWorker_SetEntryState_NoEntry(t *testing.T) { + require.Nil(t, q[mothership_db.Entry]().Delete()) + defer func() { + require.Nil(t, q[mothership_db.Entry]().Delete()) + }() + + entry, err := testW.SetEntryState("entries/123", mothershippb.Entry_ON_HOLD, nil) + require.Nil(t, entry) + require.NotNil(t, err) + require.Contains(t, err.Error(), "entry does not exist") +} + +func TestWorker_SetWorkerLastCheckinTime(t *testing.T) { + require.Nil(t, testW.SetWorkerLastCheckinTime("test-worker")) + // Verify that the worker last checkin time is at most 15 seconds ago. + w, err := q[mothership_db.Worker]().F("worker_id", "test-worker").GetOrNil() + require.Nil(t, err) + require.NotNil(t, w) + require.True(t, w.LastCheckinTime.Valid) + require.WithinDuration(t, w.LastCheckinTime.Time, time.Now(), 15*time.Second) +} + +func TestWorker_SetWorkerLastCheckinTime_NotFound(t *testing.T) { + err := testW.SetWorkerLastCheckinTime("not-found") + require.NotNil(t, err) + require.Contains(t, err.Error(), "worker does not exist") +} diff --git a/tools/mothership/worker_server/forge_test.go b/tools/mothership/worker_server/forge_test.go new file mode 100644 index 00000000..9879d1d4 --- /dev/null +++ b/tools/mothership/worker_server/forge_test.go @@ -0,0 +1,39 @@ +package mothership_worker_server + +import ( + "fmt" + transport_http "github.com/go-git/go-git/v5/plumbing/transport/http" + "go.resf.org/peridot/tools/mothership/worker_server/forge" + "time" +) + +type inMemoryForge struct { + localTempDir string + repos map[string]bool + remoteBaseURL string +} + +func (f *inMemoryForge) GetAuthenticator() (*forge.Authenticator, error) { + return &forge.Authenticator{ + AuthMethod: &transport_http.BasicAuth{ + Username: "user", + Password: "pass", + }, + AuthorName: "Test User", + AuthorEmail: "test@resf.org", + Expires: time.Now().Add(time.Hour), + }, nil +} + +func (f *inMemoryForge) GetRemote(repo string) string { + return fmt.Sprintf("file://%s/%s", f.localTempDir, repo) +} + +func (f *inMemoryForge) GetCommitViewerURL(repo string, commit string) string { + return f.remoteBaseURL + "/" + repo + "/commit/" + commit +} + +func (f *inMemoryForge) EnsureRepositoryExists(auth *forge.Authenticator, repo string) error { + f.repos[repo] = true + return nil +} diff --git a/tools/mothership/worker_server/main_test.go b/tools/mothership/worker_server/main_test.go new file mode 100644 index 00000000..05a309b3 --- /dev/null +++ b/tools/mothership/worker_server/main_test.go @@ -0,0 +1,149 @@ +package mothership_worker_server + +import ( + "bytes" + "context" + _ "embed" + "github.com/go-git/go-billy/v5/osfs" + "github.com/stretchr/testify/suite" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/modules/postgres" + "github.com/testcontainers/testcontainers-go/wait" + base "go.resf.org/peridot/base/go" + storage_memory "go.resf.org/peridot/base/go/storage/memory" + mothership_db "go.resf.org/peridot/tools/mothership/db" + mothership_migrations "go.resf.org/peridot/tools/mothership/migrations" + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/testsuite" + "golang.org/x/crypto/openpgp" + "os" + "path/filepath" + "testing" + "time" +) + +var ( + testW *Worker + testWRolling *Worker + //go:embed testdata/RPM-GPG-KEY-Rocky-8 + rocky8GpgKey []byte + inmf *inMemoryForge +) + +type UnitTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite + + env *testsuite.TestWorkflowEnvironment +} + +type noopLogger struct{} + +func (n *noopLogger) Debug(string, ...any) {} +func (n *noopLogger) Info(string, ...any) {} +func (n *noopLogger) Warn(string, ...any) {} +func (n *noopLogger) Error(string, ...any) {} +func (n *noopLogger) With(...any) log.Logger { + return n +} + +func (s *UnitTestSuite) SetupTest() { + s.env = s.NewTestWorkflowEnvironment() +} + +func (s *UnitTestSuite) AfterTest(suiteName, testName string) { + s.env.AssertExpectations(s.T()) +} + +func TestUnitTestSuite(t *testing.T) { + ts := new(UnitTestSuite) + ts.SetLogger(&noopLogger{}) + suite.Run(t, ts) +} + +func TestMain(m *testing.M) { + // Create temporary file + dir, err := os.MkdirTemp("", "test-db-*") + if err != nil { + panic(err) + } + defer os.RemoveAll(dir) + + scripts, err := base.EmbedFSToOSFS(dir, mothership_migrations.UpSQLs, ".") + if err != nil { + panic(err) + } + + ctx := context.Background() + pgContainer, err := postgres.RunContainer( + ctx, + testcontainers.WithImage("postgres:15.3-alpine"), + postgres.WithInitScripts(scripts...), + postgres.WithDatabase("mshiptest"), + postgres.WithUsername("postgres"), + postgres.WithPassword("postgres"), + testcontainers.WithWaitStrategy( + wait. + ForLog("database system is ready to accept connections"). + WithOccurrence(2).WithStartupTimeout(5*time.Second), + ), + ) + if err != nil { + panic(err) + } + defer pgContainer.Terminate(ctx) + + connStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable") + if err != nil { + panic(err) + } + + db, err := base.NewDB(connStr) + if err != nil { + panic(err) + } + + // Get current working directory + cwd, err := os.Getwd() + if err != nil { + panic(err) + } + lookasideFS := osfs.New("/") + inMemStorage := storage_memory.New(lookasideFS, filepath.Join(cwd, "testdata")) + + var gpgKeys openpgp.EntityList + keyRing, err := openpgp.ReadArmoredKeyRing(bytes.NewReader(rocky8GpgKey)) + if err != nil { + panic(err) + } + + gpgKeys = append(gpgKeys, keyRing...) + + tempDirForge, err := os.MkdirTemp("", "test-forge-*") + if err != nil { + panic(err) + } + defer os.RemoveAll(tempDirForge) + + inmf = &inMemoryForge{ + remoteBaseURL: "https://testforge.resf.org", + localTempDir: tempDirForge, + repos: map[string]bool{}, + } + testW = New(db, inMemStorage, gpgKeys, inmf, false) + testWRolling = New(db, inMemStorage, gpgKeys, inmf, true) + + if err := q[mothership_db.Worker]().Create(&mothership_db.Worker{ + Name: base.NameGen("workers"), + WorkerID: "test-worker", + ApiSecret: "test-secret", + }); err != nil { + panic(err) + } + + os.Exit(m.Run()) +} + +func q[T any]() base.Pika[T] { + return base.Q[T](testW.db) +} diff --git a/tools/mothership/worker_server/process_rpm_test.go b/tools/mothership/worker_server/process_rpm_test.go new file mode 100644 index 00000000..251d87de --- /dev/null +++ b/tools/mothership/worker_server/process_rpm_test.go @@ -0,0 +1,22 @@ +package mothership_worker_server + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestWorker_VerifyResourceExists(t *testing.T) { + require.Nil(t, testW.VerifyResourceExists("memory://efi-rpm-macros-3-3.el8.src.rpm")) +} + +func TestWorker_VerifyResourceExists_NotFound(t *testing.T) { + err := testW.VerifyResourceExists("memory://not-found.rpm") + require.NotNil(t, err) + require.Equal(t, err.Error(), "resource does not exist") +} + +func TestWorker_VerifyResourceExists_CannotRead(t *testing.T) { + err := testW.VerifyResourceExists("bad-protocol://not-found.rpm") + require.NotNil(t, err) + require.Contains(t, err.Error(), "client submitted a resource URI that cannot be read by server") +} diff --git a/tools/mothership/worker_server/retract_entry_test.go b/tools/mothership/worker_server/retract_entry_test.go new file mode 100644 index 00000000..7dc95663 --- /dev/null +++ b/tools/mothership/worker_server/retract_entry_test.go @@ -0,0 +1 @@ +package mothership_worker_server diff --git a/tools/mothership/worker_server/testdata/RPM-GPG-KEY-Rocky-8 b/tools/mothership/worker_server/testdata/RPM-GPG-KEY-Rocky-8 new file mode 100644 index 00000000..28ce769c --- /dev/null +++ b/tools/mothership/worker_server/testdata/RPM-GPG-KEY-Rocky-8 @@ -0,0 +1,29 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mQINBGAofzYBEAC6yS1azw6f3wmaVd//3aSy6O2c9+jeetulRQvg2LvhRRS1eNqp +/x9tbBhfohu/tlDkGpYHV7diePgMml9SZDy1sKlI3tDhx6GZ3xwF0fd1vWBZpmNk +D9gRkUmYBeLotmcXQZ8ZpWLicosFtDpJEYpLUhuIgTKwt4gxJrHvkWsGQiBkJxKD +u3/RlL4IYA3Ot9iuCBflc91EyAw1Yj0gKcDzbOqjvlGtS3ASXgxPqSfU0uLC9USF +uKDnP2tcnlKKGfj0u6VkqISliSuRAzjlKho9Meond+mMIFOTT6qp4xyu+9Dj3IjZ +IC6rBXRU3xi8z0qYptoFZ6hx70NV5u+0XUzDMXdjQ5S859RYJKijiwmfMC7gZQAf +OkdOcicNzen/TwD/slhiCDssHBNEe86Wwu5kmDoCri7GJlYOlWU42Xi0o1JkVltN +D8ZId+EBDIms7ugSwGOVSxyZs43q2IAfFYCRtyKHFlgHBRe9/KTWPUrnsfKxGJgC +Do3Yb63/IYTvfTJptVfhQtL1AhEAeF1I+buVoJRmBEyYKD9BdU4xQN39VrZKziO3 +hDIGng/eK6PaPhUdq6XqvmnsZ2h+KVbyoj4cTo2gKCB2XA7O2HLQsuGduHzYKNjf +QR9j0djjwTrsvGvzfEzchP19723vYf7GdcLvqtPqzpxSX2FNARpCGXBw9wARAQAB +tDNSZWxlYXNlIEVuZ2luZWVyaW5nIDxpbmZyYXN0cnVjdHVyZUByb2NreWxpbnV4 +Lm9yZz6JAk4EEwEIADgWIQRwUcRwqSn0VM6+N7cVr12sbXRaYAUCYCh/NgIbDwUL +CQgHAgYVCgkICwIEFgIDAQIeAQIXgAAKCRAVr12sbXRaYLFmEACSMvoO1FDdyAbu +1m6xEzDhs7FgnZeQNzLZECv2j+ggFSJXezlNVOZ5I1I8umBan2ywfKQD8M+IjmrW +k9/7h9i54t8RS/RN7KNo7ECGnKXqXDPzBBTs1Gwo1WzltAoaDKUfXqQ4oJ4aCP/q +/XPVWEzgpJO1XEezvCq8VXisutyDiXEjjMIeBczxb1hbamQX+jLTIQ1MDJ4Zo1YP +zlUqrHW434XC2b1/WbSaylq8Wk9cksca5J+g3FqTlgiWozyy0uxygIRjb6iTzKXk +V7SYxeXp3hNTuoUgiFkjh5/0yKWCwx7aQqlHar9GjpxmBDAO0kzOlgtTw//EqTwR +KnYZLig9FW0PhwvZJUigr0cvs/XXTTb77z/i/dfHkrjVTTYenNyXogPtTtSyxqca +61fbPf0B/S3N43PW8URXBRS0sykpX4SxKu+PwKCqf+OJ7hMEVAapqzTt1q9T7zyB +QwvCVx8s7WWvXbs2d6ZUrArklgjHoHQcdxJKdhuRmD34AuXWCLW+gH8rJWZpuNl3 ++WsPZX4PvjKDgMw6YMcV7zhWX6c0SevKtzt7WP3XoKDuPhK1PMGJQqQ7spegGB+5 +DZvsJS48Ip0S45Qfmj82ibXaCBJHTNZE8Zs+rdTjQ9DS5qvzRA1sRA1dBb/7OLYE +JmeWf4VZyebm+gc50szsg6Ut2yT8hw== +=AiP8 +-----END PGP PUBLIC KEY BLOCK----- diff --git a/tools/mothership/worker_server/testdata/efi-rpm-macros-3-3.el8.src.rpm b/tools/mothership/worker_server/testdata/efi-rpm-macros-3-3.el8.src.rpm new file mode 100644 index 00000000..695313ab Binary files /dev/null and b/tools/mothership/worker_server/testdata/efi-rpm-macros-3-3.el8.src.rpm differ diff --git a/tools/mothership/worker_server/workflows_test.go b/tools/mothership/worker_server/workflows_test.go new file mode 100644 index 00000000..14e317a0 --- /dev/null +++ b/tools/mothership/worker_server/workflows_test.go @@ -0,0 +1,235 @@ +package mothership_worker_server + +import ( + "database/sql" + "errors" + "github.com/stretchr/testify/mock" + base "go.resf.org/peridot/base/go" + mothership_db "go.resf.org/peridot/tools/mothership/db" + mothershippb "go.resf.org/peridot/tools/mothership/pb" + "time" +) + +func (s *UnitTestSuite) TestProcessRPMWorkflow_FullSuccess1() { + s.env.OnActivity(testW.VerifyResourceExists, "memory://efi-rpm-macros-3-3.el8.src.rpm").Return(nil) + s.env.OnActivity(testW.SetWorkerLastCheckinTime, mock.Anything).Return(nil) + + entry := (&mothership_db.Entry{ + Name: base.NameGen("entries"), + CreateTime: time.Now(), + OSRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Sha256Sum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", + RepositoryName: "BaseOS", + WorkerID: sql.NullString{ + String: "test-worker", + Valid: true, + }, + State: mothershippb.Entry_ARCHIVING, + }).ToPB() + s.env.OnActivity(testW.CreateEntry, mock.Anything).Return(entry, nil) + + entry.EntryId = "efi-rpm-macros-3-3.el8.src" + entry.Sha256Sum = "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28" + s.env.OnActivity(testW.SetEntryIDFromRPM, entry.Name, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum).Return(entry, nil) + + importRpmRes := &mothershippb.ImportRPMResponse{ + CommitHash: "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83", + CommitUri: testW.forge.GetCommitViewerURL("efi-rpm-macros", "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83"), + CommitBranch: "el-8.8", + CommitTag: "imports/el-8.8/efi-rpm-macros-3-3.el8", + Nevra: "efi-rpm-macros-0:3-3.el8.aarch64", + Pkg: "efi-rpm-macros", + } + s.env.OnActivity(testW.ImportRPM, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum, entry.OsRelease).Return(importRpmRes, nil) + + s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVED, importRpmRes).Return(entry, nil) + + args := &mothershippb.ProcessRPMArgs{ + Request: &mothershippb.ProcessRPMRequest{ + RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm", + OsRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Checksum: entry.Sha256Sum, + Repository: "BaseOS", + }, + InternalRequest: &mothershippb.ProcessRPMInternalRequest{ + WorkerId: "test-worker", + }, + } + s.env.ExecuteWorkflow(ProcessRPMWorkflow, args) + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) + + var res mothershippb.ProcessRPMResponse + s.NoError(s.env.GetWorkflowResult(&res)) + s.Equal(entry.Name, res.Entry.Name) + s.Equal(entry.EntryId, res.Entry.EntryId) +} + +func (s *UnitTestSuite) TestProcessRPMWorkflow_OnHold_Cancel() { + s.env.OnActivity(testW.VerifyResourceExists, "memory://efi-rpm-macros-3-3.el8.src.rpm").Return(nil) + s.env.OnActivity(testW.SetWorkerLastCheckinTime, mock.Anything).Return(nil) + + entry := (&mothership_db.Entry{ + Name: base.NameGen("entries"), + CreateTime: time.Now(), + OSRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Sha256Sum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", + RepositoryName: "BaseOS", + WorkerID: sql.NullString{ + String: "test-worker", + Valid: true, + }, + State: mothershippb.Entry_ARCHIVING, + }).ToPB() + s.env.OnActivity(testW.CreateEntry, mock.Anything).Return(entry, nil) + + entry.EntryId = "efi-rpm-macros-3-3.el8.src" + entry.Sha256Sum = "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28" + s.env.OnActivity(testW.SetEntryIDFromRPM, entry.Name, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum).Return(entry, nil) + + importErr := errors.New("import error") + s.env.OnActivity(testW.ImportRPM, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum, entry.OsRelease).Return(nil, importErr) + + s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, mock.Anything).Return(entry, nil) + s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_CANCELLED, mock.Anything).Return(entry, nil) + + s.env.RegisterDelayedCallback(func() { + s.env.CancelWorkflow() + }, 500*time.Millisecond) + + args := &mothershippb.ProcessRPMArgs{ + Request: &mothershippb.ProcessRPMRequest{ + RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm", + OsRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Checksum: entry.Sha256Sum, + Repository: "BaseOS", + }, + InternalRequest: &mothershippb.ProcessRPMInternalRequest{ + WorkerId: "test-worker", + }, + } + s.env.ExecuteWorkflow(ProcessRPMWorkflow, args) + + s.True(s.env.IsWorkflowCompleted()) + s.ErrorContains(s.env.GetWorkflowError(), "canceled") +} + +func (s *UnitTestSuite) TestProcessRPMWorkflow_OnHold_Success() { + s.env.OnActivity(testW.VerifyResourceExists, "memory://efi-rpm-macros-3-3.el8.src.rpm").Return(nil) + s.env.OnActivity(testW.SetWorkerLastCheckinTime, mock.Anything).Return(nil) + + entry := (&mothership_db.Entry{ + Name: base.NameGen("entries"), + CreateTime: time.Now(), + OSRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Sha256Sum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", + RepositoryName: "BaseOS", + WorkerID: sql.NullString{ + String: "test-worker", + Valid: true, + }, + State: mothershippb.Entry_ARCHIVING, + }).ToPB() + s.env.OnActivity(testW.CreateEntry, mock.Anything).Return(entry, nil) + + entry.EntryId = "efi-rpm-macros-3-3.el8.src" + entry.Sha256Sum = "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28" + s.env.OnActivity(testW.SetEntryIDFromRPM, entry.Name, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum).Return(&*entry, nil) + + importErr := errors.New("import error") + importRpmRes := &mothershippb.ImportRPMResponse{ + CommitHash: "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83", + CommitUri: testW.forge.GetCommitViewerURL("efi-rpm-macros", "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83"), + CommitBranch: "el-8.8", + CommitTag: "imports/el-8.8/efi-rpm-macros-3-3.el8", + Nevra: "efi-rpm-macros-0:3-3.el8.aarch64", + Pkg: "efi-rpm-macros", + } + shouldErrImport := true + s.env.OnActivity(testW.ImportRPM, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum, entry.OsRelease). + Return(func(uri string, checksum string, osRelease string) (*mothershippb.ImportRPMResponse, error) { + if shouldErrImport { + return nil, importErr + } + return importRpmRes, nil + }) + + entry.State = mothershippb.Entry_ON_HOLD + s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, mock.Anything).Return(&*entry, nil) + + entry.State = mothershippb.Entry_ARCHIVED + s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVING, mock.Anything).Return(&*entry, nil) + + entry.State = mothershippb.Entry_ARCHIVED + s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVED, importRpmRes).Return(&*entry, nil) + + s.env.RegisterDelayedCallback(func() { + shouldErrImport = false + s.env.SignalWorkflow("rescue", true) + }, 500*time.Millisecond) + + args := &mothershippb.ProcessRPMArgs{ + Request: &mothershippb.ProcessRPMRequest{ + RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm", + OsRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Checksum: entry.Sha256Sum, + Repository: "BaseOS", + }, + InternalRequest: &mothershippb.ProcessRPMInternalRequest{ + WorkerId: "test-worker", + }, + } + s.env.ExecuteWorkflow(ProcessRPMWorkflow, args) + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) + + var res mothershippb.ProcessRPMResponse + s.NoError(s.env.GetWorkflowResult(&res)) + s.Equal(entry.Name, res.Entry.Name) + s.Equal(entry.EntryId, res.Entry.EntryId) +} + +func (s *UnitTestSuite) TestProcessRPMWorkflow_OnHold_Error() { + s.env.OnActivity(testW.VerifyResourceExists, "memory://efi-rpm-macros-3-3.el8.src.rpm").Return(nil) + s.env.OnActivity(testW.SetWorkerLastCheckinTime, mock.Anything).Return(nil) + + entry := (&mothership_db.Entry{ + Name: base.NameGen("entries"), + CreateTime: time.Now(), + OSRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Sha256Sum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", + RepositoryName: "BaseOS", + WorkerID: sql.NullString{ + String: "test-worker", + Valid: true, + }, + State: mothershippb.Entry_ARCHIVING, + }).ToPB() + s.env.OnActivity(testW.CreateEntry, mock.Anything).Return(entry, nil) + + entry.EntryId = "efi-rpm-macros-3-3.el8.src" + entry.Sha256Sum = "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28" + s.env.OnActivity(testW.SetEntryIDFromRPM, entry.Name, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum).Return(entry, nil) + + importErr := errors.New("import error") + s.env.OnActivity(testW.ImportRPM, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum, entry.OsRelease).Return(nil, importErr) + + s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, mock.Anything).Return(entry, nil) + + args := &mothershippb.ProcessRPMArgs{ + Request: &mothershippb.ProcessRPMRequest{ + RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm", + OsRelease: "Rocky Linux release 8.8 (Green Obsidian)", + Checksum: entry.Sha256Sum, + Repository: "BaseOS", + }, + InternalRequest: &mothershippb.ProcessRPMInternalRequest{ + WorkerId: "test-worker", + }, + } + s.env.ExecuteWorkflow(ProcessRPMWorkflow, args) + + s.True(s.env.IsWorkflowCompleted()) + s.Error(s.env.GetWorkflowError()) +}