Start working on tests for worker_server

This commit is contained in:
Mustafa Gezen 2023-09-03 07:48:10 +02:00
parent f4f960bfd5
commit d6016bba63
Signed by: mustafa
GPG Key ID: DCDF010D946438C1
8 changed files with 664 additions and 0 deletions

View File

@ -0,0 +1,189 @@
package mothership_worker_server
import (
"github.com/stretchr/testify/require"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"testing"
"time"
)
func TestWorker_CreateEntry(t *testing.T) {
require.Nil(t, q[mothership_db.Entry]().Delete())
defer func() {
require.Nil(t, q[mothership_db.Entry]().Delete())
}()
args := &mothershippb.ProcessRPMArgs{
Request: &mothershippb.ProcessRPMRequest{
RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm",
OsRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Checksum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28",
Repository: "BaseOS",
},
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: "test-worker",
},
}
entry, err := testW.CreateEntry(args)
require.Nil(t, err)
require.NotNil(t, entry)
require.Equal(t, "Rocky Linux release 8.8 (Green Obsidian)", entry.OsRelease)
require.Equal(t, "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28", entry.Sha256Sum)
c, err := q[mothership_db.Entry]().F("name", entry.Name).Count()
require.Nil(t, err)
require.Equal(t, c, 1)
}
func TestWorker_SetEntryIDFromRPM(t *testing.T) {
require.Nil(t, q[mothership_db.Entry]().Delete())
defer func() {
require.Nil(t, q[mothership_db.Entry]().Delete())
}()
args := &mothershippb.ProcessRPMArgs{
Request: &mothershippb.ProcessRPMRequest{
RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm",
OsRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Checksum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28",
Repository: "BaseOS",
},
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: "test-worker",
},
}
entry, err := testW.CreateEntry(args)
require.Nil(t, err)
require.NotNil(t, entry)
entry, err = testW.SetEntryIDFromRPM(entry.Name, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum)
require.Nil(t, err)
require.NotNil(t, entry)
require.Equal(t, "efi-rpm-macros-3-3.el8.src", entry.EntryId)
}
func TestWorker_SetEntryIDFromRPM_FailedToDownload(t *testing.T) {
require.Nil(t, q[mothership_db.Entry]().Delete())
defer func() {
require.Nil(t, q[mothership_db.Entry]().Delete())
}()
args := &mothershippb.ProcessRPMArgs{
Request: &mothershippb.ProcessRPMRequest{
RpmUri: "memory://not-found.rpm",
OsRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Checksum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28",
Repository: "BaseOS",
},
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: "test-worker",
},
}
entry, err := testW.CreateEntry(args)
require.Nil(t, err)
require.NotNil(t, entry)
entry, err = testW.SetEntryIDFromRPM(entry.Name, "memory://not-found.rpm", entry.Sha256Sum)
require.NotNil(t, err)
require.Contains(t, err.Error(), "failed to download resource")
}
func TestWorker_SetEntryState(t *testing.T) {
require.Nil(t, q[mothership_db.Entry]().Delete())
defer func() {
require.Nil(t, q[mothership_db.Entry]().Delete())
}()
args := &mothershippb.ProcessRPMArgs{
Request: &mothershippb.ProcessRPMRequest{
RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm",
OsRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Checksum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28",
Repository: "BaseOS",
},
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: "test-worker",
},
}
entry, err := testW.CreateEntry(args)
require.Nil(t, err)
require.NotNil(t, entry)
importRpmRes := &mothershippb.ImportRPMResponse{
CommitHash: "123",
CommitUri: "https://forge.resf.org/peridot/efi-rpm-macros/commit/123",
CommitBranch: "el-8.8",
CommitTag: "imports/el-8.8/efi-rpm-macros-3-3.el8",
Nevra: "efi-rpm-macros-0:3-3.el8.aarch64",
Pkg: "efi-rpm-macros",
}
entry, err = testW.SetEntryState(entry.Name, mothershippb.Entry_ARCHIVED, importRpmRes)
require.Nil(t, err)
require.NotNil(t, entry)
require.Equal(t, mothershippb.Entry_ARCHIVED, entry.State)
require.Equal(t, "123", entry.CommitHash)
require.Equal(t, "https://forge.resf.org/peridot/efi-rpm-macros/commit/123", entry.CommitUri)
require.Equal(t, "el-8.8", entry.CommitBranch)
require.Equal(t, "imports/el-8.8/efi-rpm-macros-3-3.el8", entry.CommitTag)
require.Equal(t, "efi-rpm-macros", entry.Pkg)
}
func TestWorker_SetEntryState_NoRes(t *testing.T) {
require.Nil(t, q[mothership_db.Entry]().Delete())
defer func() {
require.Nil(t, q[mothership_db.Entry]().Delete())
}()
args := &mothershippb.ProcessRPMArgs{
Request: &mothershippb.ProcessRPMRequest{
RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm",
OsRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Checksum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28",
Repository: "BaseOS",
},
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: "test-worker",
},
}
entry, err := testW.CreateEntry(args)
require.Nil(t, err)
require.NotNil(t, entry)
entry, err = testW.SetEntryState(entry.Name, mothershippb.Entry_ON_HOLD, nil)
require.Nil(t, err)
require.NotNil(t, entry)
require.Equal(t, mothershippb.Entry_ON_HOLD, entry.State)
require.Equal(t, "", entry.CommitHash)
require.Equal(t, "", entry.CommitUri)
require.Equal(t, "", entry.CommitBranch)
require.Equal(t, "", entry.CommitTag)
require.Equal(t, "", entry.Pkg)
}
func TestWorker_SetEntryState_NoEntry(t *testing.T) {
require.Nil(t, q[mothership_db.Entry]().Delete())
defer func() {
require.Nil(t, q[mothership_db.Entry]().Delete())
}()
entry, err := testW.SetEntryState("entries/123", mothershippb.Entry_ON_HOLD, nil)
require.Nil(t, entry)
require.NotNil(t, err)
require.Contains(t, err.Error(), "entry does not exist")
}
func TestWorker_SetWorkerLastCheckinTime(t *testing.T) {
require.Nil(t, testW.SetWorkerLastCheckinTime("test-worker"))
// Verify that the worker last checkin time is at most 15 seconds ago.
w, err := q[mothership_db.Worker]().F("worker_id", "test-worker").GetOrNil()
require.Nil(t, err)
require.NotNil(t, w)
require.True(t, w.LastCheckinTime.Valid)
require.WithinDuration(t, w.LastCheckinTime.Time, time.Now(), 15*time.Second)
}
func TestWorker_SetWorkerLastCheckinTime_NotFound(t *testing.T) {
err := testW.SetWorkerLastCheckinTime("not-found")
require.NotNil(t, err)
require.Contains(t, err.Error(), "worker does not exist")
}

View File

@ -0,0 +1,39 @@
package mothership_worker_server
import (
"fmt"
transport_http "github.com/go-git/go-git/v5/plumbing/transport/http"
"go.resf.org/peridot/tools/mothership/worker_server/forge"
"time"
)
type inMemoryForge struct {
localTempDir string
repos map[string]bool
remoteBaseURL string
}
func (f *inMemoryForge) GetAuthenticator() (*forge.Authenticator, error) {
return &forge.Authenticator{
AuthMethod: &transport_http.BasicAuth{
Username: "user",
Password: "pass",
},
AuthorName: "Test User",
AuthorEmail: "test@resf.org",
Expires: time.Now().Add(time.Hour),
}, nil
}
func (f *inMemoryForge) GetRemote(repo string) string {
return fmt.Sprintf("file://%s/%s", f.localTempDir, repo)
}
func (f *inMemoryForge) GetCommitViewerURL(repo string, commit string) string {
return f.remoteBaseURL + "/" + repo + "/commit/" + commit
}
func (f *inMemoryForge) EnsureRepositoryExists(auth *forge.Authenticator, repo string) error {
f.repos[repo] = true
return nil
}

View File

@ -0,0 +1,149 @@
package mothership_worker_server
import (
"bytes"
"context"
_ "embed"
"github.com/go-git/go-billy/v5/osfs"
"github.com/stretchr/testify/suite"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/wait"
base "go.resf.org/peridot/base/go"
storage_memory "go.resf.org/peridot/base/go/storage/memory"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothership_migrations "go.resf.org/peridot/tools/mothership/migrations"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/testsuite"
"golang.org/x/crypto/openpgp"
"os"
"path/filepath"
"testing"
"time"
)
var (
testW *Worker
testWRolling *Worker
//go:embed testdata/RPM-GPG-KEY-Rocky-8
rocky8GpgKey []byte
inmf *inMemoryForge
)
type UnitTestSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
env *testsuite.TestWorkflowEnvironment
}
type noopLogger struct{}
func (n *noopLogger) Debug(string, ...any) {}
func (n *noopLogger) Info(string, ...any) {}
func (n *noopLogger) Warn(string, ...any) {}
func (n *noopLogger) Error(string, ...any) {}
func (n *noopLogger) With(...any) log.Logger {
return n
}
func (s *UnitTestSuite) SetupTest() {
s.env = s.NewTestWorkflowEnvironment()
}
func (s *UnitTestSuite) AfterTest(suiteName, testName string) {
s.env.AssertExpectations(s.T())
}
func TestUnitTestSuite(t *testing.T) {
ts := new(UnitTestSuite)
ts.SetLogger(&noopLogger{})
suite.Run(t, ts)
}
func TestMain(m *testing.M) {
// Create temporary file
dir, err := os.MkdirTemp("", "test-db-*")
if err != nil {
panic(err)
}
defer os.RemoveAll(dir)
scripts, err := base.EmbedFSToOSFS(dir, mothership_migrations.UpSQLs, ".")
if err != nil {
panic(err)
}
ctx := context.Background()
pgContainer, err := postgres.RunContainer(
ctx,
testcontainers.WithImage("postgres:15.3-alpine"),
postgres.WithInitScripts(scripts...),
postgres.WithDatabase("mshiptest"),
postgres.WithUsername("postgres"),
postgres.WithPassword("postgres"),
testcontainers.WithWaitStrategy(
wait.
ForLog("database system is ready to accept connections").
WithOccurrence(2).WithStartupTimeout(5*time.Second),
),
)
if err != nil {
panic(err)
}
defer pgContainer.Terminate(ctx)
connStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
if err != nil {
panic(err)
}
db, err := base.NewDB(connStr)
if err != nil {
panic(err)
}
// Get current working directory
cwd, err := os.Getwd()
if err != nil {
panic(err)
}
lookasideFS := osfs.New("/")
inMemStorage := storage_memory.New(lookasideFS, filepath.Join(cwd, "testdata"))
var gpgKeys openpgp.EntityList
keyRing, err := openpgp.ReadArmoredKeyRing(bytes.NewReader(rocky8GpgKey))
if err != nil {
panic(err)
}
gpgKeys = append(gpgKeys, keyRing...)
tempDirForge, err := os.MkdirTemp("", "test-forge-*")
if err != nil {
panic(err)
}
defer os.RemoveAll(tempDirForge)
inmf = &inMemoryForge{
remoteBaseURL: "https://testforge.resf.org",
localTempDir: tempDirForge,
repos: map[string]bool{},
}
testW = New(db, inMemStorage, gpgKeys, inmf, false)
testWRolling = New(db, inMemStorage, gpgKeys, inmf, true)
if err := q[mothership_db.Worker]().Create(&mothership_db.Worker{
Name: base.NameGen("workers"),
WorkerID: "test-worker",
ApiSecret: "test-secret",
}); err != nil {
panic(err)
}
os.Exit(m.Run())
}
func q[T any]() base.Pika[T] {
return base.Q[T](testW.db)
}

View File

@ -0,0 +1,22 @@
package mothership_worker_server
import (
"github.com/stretchr/testify/require"
"testing"
)
func TestWorker_VerifyResourceExists(t *testing.T) {
require.Nil(t, testW.VerifyResourceExists("memory://efi-rpm-macros-3-3.el8.src.rpm"))
}
func TestWorker_VerifyResourceExists_NotFound(t *testing.T) {
err := testW.VerifyResourceExists("memory://not-found.rpm")
require.NotNil(t, err)
require.Equal(t, err.Error(), "resource does not exist")
}
func TestWorker_VerifyResourceExists_CannotRead(t *testing.T) {
err := testW.VerifyResourceExists("bad-protocol://not-found.rpm")
require.NotNil(t, err)
require.Contains(t, err.Error(), "client submitted a resource URI that cannot be read by server")
}

View File

@ -0,0 +1 @@
package mothership_worker_server

View File

@ -0,0 +1,29 @@
-----BEGIN PGP PUBLIC KEY BLOCK-----
mQINBGAofzYBEAC6yS1azw6f3wmaVd//3aSy6O2c9+jeetulRQvg2LvhRRS1eNqp
/x9tbBhfohu/tlDkGpYHV7diePgMml9SZDy1sKlI3tDhx6GZ3xwF0fd1vWBZpmNk
D9gRkUmYBeLotmcXQZ8ZpWLicosFtDpJEYpLUhuIgTKwt4gxJrHvkWsGQiBkJxKD
u3/RlL4IYA3Ot9iuCBflc91EyAw1Yj0gKcDzbOqjvlGtS3ASXgxPqSfU0uLC9USF
uKDnP2tcnlKKGfj0u6VkqISliSuRAzjlKho9Meond+mMIFOTT6qp4xyu+9Dj3IjZ
IC6rBXRU3xi8z0qYptoFZ6hx70NV5u+0XUzDMXdjQ5S859RYJKijiwmfMC7gZQAf
OkdOcicNzen/TwD/slhiCDssHBNEe86Wwu5kmDoCri7GJlYOlWU42Xi0o1JkVltN
D8ZId+EBDIms7ugSwGOVSxyZs43q2IAfFYCRtyKHFlgHBRe9/KTWPUrnsfKxGJgC
Do3Yb63/IYTvfTJptVfhQtL1AhEAeF1I+buVoJRmBEyYKD9BdU4xQN39VrZKziO3
hDIGng/eK6PaPhUdq6XqvmnsZ2h+KVbyoj4cTo2gKCB2XA7O2HLQsuGduHzYKNjf
QR9j0djjwTrsvGvzfEzchP19723vYf7GdcLvqtPqzpxSX2FNARpCGXBw9wARAQAB
tDNSZWxlYXNlIEVuZ2luZWVyaW5nIDxpbmZyYXN0cnVjdHVyZUByb2NreWxpbnV4
Lm9yZz6JAk4EEwEIADgWIQRwUcRwqSn0VM6+N7cVr12sbXRaYAUCYCh/NgIbDwUL
CQgHAgYVCgkICwIEFgIDAQIeAQIXgAAKCRAVr12sbXRaYLFmEACSMvoO1FDdyAbu
1m6xEzDhs7FgnZeQNzLZECv2j+ggFSJXezlNVOZ5I1I8umBan2ywfKQD8M+IjmrW
k9/7h9i54t8RS/RN7KNo7ECGnKXqXDPzBBTs1Gwo1WzltAoaDKUfXqQ4oJ4aCP/q
/XPVWEzgpJO1XEezvCq8VXisutyDiXEjjMIeBczxb1hbamQX+jLTIQ1MDJ4Zo1YP
zlUqrHW434XC2b1/WbSaylq8Wk9cksca5J+g3FqTlgiWozyy0uxygIRjb6iTzKXk
V7SYxeXp3hNTuoUgiFkjh5/0yKWCwx7aQqlHar9GjpxmBDAO0kzOlgtTw//EqTwR
KnYZLig9FW0PhwvZJUigr0cvs/XXTTb77z/i/dfHkrjVTTYenNyXogPtTtSyxqca
61fbPf0B/S3N43PW8URXBRS0sykpX4SxKu+PwKCqf+OJ7hMEVAapqzTt1q9T7zyB
QwvCVx8s7WWvXbs2d6ZUrArklgjHoHQcdxJKdhuRmD34AuXWCLW+gH8rJWZpuNl3
+WsPZX4PvjKDgMw6YMcV7zhWX6c0SevKtzt7WP3XoKDuPhK1PMGJQqQ7spegGB+5
DZvsJS48Ip0S45Qfmj82ibXaCBJHTNZE8Zs+rdTjQ9DS5qvzRA1sRA1dBb/7OLYE
JmeWf4VZyebm+gc50szsg6Ut2yT8hw==
=AiP8
-----END PGP PUBLIC KEY BLOCK-----

View File

@ -0,0 +1,235 @@
package mothership_worker_server
import (
"database/sql"
"errors"
"github.com/stretchr/testify/mock"
base "go.resf.org/peridot/base/go"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"time"
)
func (s *UnitTestSuite) TestProcessRPMWorkflow_FullSuccess1() {
s.env.OnActivity(testW.VerifyResourceExists, "memory://efi-rpm-macros-3-3.el8.src.rpm").Return(nil)
s.env.OnActivity(testW.SetWorkerLastCheckinTime, mock.Anything).Return(nil)
entry := (&mothership_db.Entry{
Name: base.NameGen("entries"),
CreateTime: time.Now(),
OSRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Sha256Sum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28",
RepositoryName: "BaseOS",
WorkerID: sql.NullString{
String: "test-worker",
Valid: true,
},
State: mothershippb.Entry_ARCHIVING,
}).ToPB()
s.env.OnActivity(testW.CreateEntry, mock.Anything).Return(entry, nil)
entry.EntryId = "efi-rpm-macros-3-3.el8.src"
entry.Sha256Sum = "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28"
s.env.OnActivity(testW.SetEntryIDFromRPM, entry.Name, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum).Return(entry, nil)
importRpmRes := &mothershippb.ImportRPMResponse{
CommitHash: "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83",
CommitUri: testW.forge.GetCommitViewerURL("efi-rpm-macros", "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83"),
CommitBranch: "el-8.8",
CommitTag: "imports/el-8.8/efi-rpm-macros-3-3.el8",
Nevra: "efi-rpm-macros-0:3-3.el8.aarch64",
Pkg: "efi-rpm-macros",
}
s.env.OnActivity(testW.ImportRPM, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum, entry.OsRelease).Return(importRpmRes, nil)
s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVED, importRpmRes).Return(entry, nil)
args := &mothershippb.ProcessRPMArgs{
Request: &mothershippb.ProcessRPMRequest{
RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm",
OsRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Checksum: entry.Sha256Sum,
Repository: "BaseOS",
},
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: "test-worker",
},
}
s.env.ExecuteWorkflow(ProcessRPMWorkflow, args)
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
var res mothershippb.ProcessRPMResponse
s.NoError(s.env.GetWorkflowResult(&res))
s.Equal(entry.Name, res.Entry.Name)
s.Equal(entry.EntryId, res.Entry.EntryId)
}
func (s *UnitTestSuite) TestProcessRPMWorkflow_OnHold_Cancel() {
s.env.OnActivity(testW.VerifyResourceExists, "memory://efi-rpm-macros-3-3.el8.src.rpm").Return(nil)
s.env.OnActivity(testW.SetWorkerLastCheckinTime, mock.Anything).Return(nil)
entry := (&mothership_db.Entry{
Name: base.NameGen("entries"),
CreateTime: time.Now(),
OSRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Sha256Sum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28",
RepositoryName: "BaseOS",
WorkerID: sql.NullString{
String: "test-worker",
Valid: true,
},
State: mothershippb.Entry_ARCHIVING,
}).ToPB()
s.env.OnActivity(testW.CreateEntry, mock.Anything).Return(entry, nil)
entry.EntryId = "efi-rpm-macros-3-3.el8.src"
entry.Sha256Sum = "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28"
s.env.OnActivity(testW.SetEntryIDFromRPM, entry.Name, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum).Return(entry, nil)
importErr := errors.New("import error")
s.env.OnActivity(testW.ImportRPM, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum, entry.OsRelease).Return(nil, importErr)
s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, mock.Anything).Return(entry, nil)
s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_CANCELLED, mock.Anything).Return(entry, nil)
s.env.RegisterDelayedCallback(func() {
s.env.CancelWorkflow()
}, 500*time.Millisecond)
args := &mothershippb.ProcessRPMArgs{
Request: &mothershippb.ProcessRPMRequest{
RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm",
OsRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Checksum: entry.Sha256Sum,
Repository: "BaseOS",
},
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: "test-worker",
},
}
s.env.ExecuteWorkflow(ProcessRPMWorkflow, args)
s.True(s.env.IsWorkflowCompleted())
s.ErrorContains(s.env.GetWorkflowError(), "canceled")
}
func (s *UnitTestSuite) TestProcessRPMWorkflow_OnHold_Success() {
s.env.OnActivity(testW.VerifyResourceExists, "memory://efi-rpm-macros-3-3.el8.src.rpm").Return(nil)
s.env.OnActivity(testW.SetWorkerLastCheckinTime, mock.Anything).Return(nil)
entry := (&mothership_db.Entry{
Name: base.NameGen("entries"),
CreateTime: time.Now(),
OSRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Sha256Sum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28",
RepositoryName: "BaseOS",
WorkerID: sql.NullString{
String: "test-worker",
Valid: true,
},
State: mothershippb.Entry_ARCHIVING,
}).ToPB()
s.env.OnActivity(testW.CreateEntry, mock.Anything).Return(entry, nil)
entry.EntryId = "efi-rpm-macros-3-3.el8.src"
entry.Sha256Sum = "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28"
s.env.OnActivity(testW.SetEntryIDFromRPM, entry.Name, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum).Return(&*entry, nil)
importErr := errors.New("import error")
importRpmRes := &mothershippb.ImportRPMResponse{
CommitHash: "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83",
CommitUri: testW.forge.GetCommitViewerURL("efi-rpm-macros", "4e1243bd22c66e76c2ba9eddc1f91394e57f9f83"),
CommitBranch: "el-8.8",
CommitTag: "imports/el-8.8/efi-rpm-macros-3-3.el8",
Nevra: "efi-rpm-macros-0:3-3.el8.aarch64",
Pkg: "efi-rpm-macros",
}
shouldErrImport := true
s.env.OnActivity(testW.ImportRPM, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum, entry.OsRelease).
Return(func(uri string, checksum string, osRelease string) (*mothershippb.ImportRPMResponse, error) {
if shouldErrImport {
return nil, importErr
}
return importRpmRes, nil
})
entry.State = mothershippb.Entry_ON_HOLD
s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, mock.Anything).Return(&*entry, nil)
entry.State = mothershippb.Entry_ARCHIVED
s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVING, mock.Anything).Return(&*entry, nil)
entry.State = mothershippb.Entry_ARCHIVED
s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ARCHIVED, importRpmRes).Return(&*entry, nil)
s.env.RegisterDelayedCallback(func() {
shouldErrImport = false
s.env.SignalWorkflow("rescue", true)
}, 500*time.Millisecond)
args := &mothershippb.ProcessRPMArgs{
Request: &mothershippb.ProcessRPMRequest{
RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm",
OsRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Checksum: entry.Sha256Sum,
Repository: "BaseOS",
},
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: "test-worker",
},
}
s.env.ExecuteWorkflow(ProcessRPMWorkflow, args)
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
var res mothershippb.ProcessRPMResponse
s.NoError(s.env.GetWorkflowResult(&res))
s.Equal(entry.Name, res.Entry.Name)
s.Equal(entry.EntryId, res.Entry.EntryId)
}
func (s *UnitTestSuite) TestProcessRPMWorkflow_OnHold_Error() {
s.env.OnActivity(testW.VerifyResourceExists, "memory://efi-rpm-macros-3-3.el8.src.rpm").Return(nil)
s.env.OnActivity(testW.SetWorkerLastCheckinTime, mock.Anything).Return(nil)
entry := (&mothership_db.Entry{
Name: base.NameGen("entries"),
CreateTime: time.Now(),
OSRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Sha256Sum: "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28",
RepositoryName: "BaseOS",
WorkerID: sql.NullString{
String: "test-worker",
Valid: true,
},
State: mothershippb.Entry_ARCHIVING,
}).ToPB()
s.env.OnActivity(testW.CreateEntry, mock.Anything).Return(entry, nil)
entry.EntryId = "efi-rpm-macros-3-3.el8.src"
entry.Sha256Sum = "518a9418fec1deaeb4c636615d8d81fb60146883c431ea15ab1127893d075d28"
s.env.OnActivity(testW.SetEntryIDFromRPM, entry.Name, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum).Return(entry, nil)
importErr := errors.New("import error")
s.env.OnActivity(testW.ImportRPM, "memory://efi-rpm-macros-3-3.el8.src.rpm", entry.Sha256Sum, entry.OsRelease).Return(nil, importErr)
s.env.OnActivity(testW.SetEntryState, entry.Name, mothershippb.Entry_ON_HOLD, mock.Anything).Return(entry, nil)
args := &mothershippb.ProcessRPMArgs{
Request: &mothershippb.ProcessRPMRequest{
RpmUri: "memory://efi-rpm-macros-3-3.el8.src.rpm",
OsRelease: "Rocky Linux release 8.8 (Green Obsidian)",
Checksum: entry.Sha256Sum,
Repository: "BaseOS",
},
InternalRequest: &mothershippb.ProcessRPMInternalRequest{
WorkerId: "test-worker",
},
}
s.env.ExecuteWorkflow(ProcessRPMWorkflow, args)
s.True(s.env.IsWorkflowCompleted())
s.Error(s.env.GetWorkflowError())
}