diff --git a/tools/mothership/db/entry.go b/tools/mothership/db/entry.go index 3d4810a4..983f967b 100644 --- a/tools/mothership/db/entry.go +++ b/tools/mothership/db/entry.go @@ -23,19 +23,21 @@ import ( ) type Entry struct { - PikaTableName string `pika:"entries"` + PikaTableName string `pika:"entries"` + PikaDefaultOrderBy string `pika:"-create_time"` - Name string `db:"name"` - EntryID string `db:"entry_id"` - CreateTime time.Time `db:"create_time" pika:"omitempty"` - OSRelease string `db:"os_release"` - Sha256Sum string `db:"sha256_sum"` - RepositoryName string `db:"repository_name"` - WorkerID sql.NullString `db:"worker_id"` - BatchName sql.NullString `db:"batch_name"` - UserEmail sql.NullString `db:"user_email"` - CommitURI string `db:"commit_uri"` - CommitHash string `db:"commit_hash"` + Name string `db:"name"` + EntryID string `db:"entry_id"` + CreateTime time.Time `db:"create_time" pika:"omitempty"` + OSRelease string `db:"os_release"` + Sha256Sum string `db:"sha256_sum"` + RepositoryName string `db:"repository_name"` + WorkerID sql.NullString `db:"worker_id"` + BatchName sql.NullString `db:"batch_name"` + UserEmail sql.NullString `db:"user_email"` + CommitURI string `db:"commit_uri"` + CommitHash string `db:"commit_hash"` + State mothershippb.Entry_State `db:"state"` } func (e *Entry) GetID() string { @@ -55,5 +57,6 @@ func (e *Entry) ToPB() *mothershippb.Entry { UserEmail: base.SqlNullString(e.UserEmail), CommitUri: e.CommitURI, CommitHash: e.CommitHash, + State: e.State, } } diff --git a/tools/mothership/migrations/000001_init.up.sql b/tools/mothership/migrations/000001_init.up.sql index 7f487435..ea06d644 100644 --- a/tools/mothership/migrations/000001_init.up.sql +++ b/tools/mothership/migrations/000001_init.up.sql @@ -24,26 +24,28 @@ CREATE TABLE workers CREATE TABLE entries ( name VARCHAR(255) PRIMARY KEY, - entry_id VARCHAR(255) UNIQUE NOT NULL, - create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), - os_release TEXT NOT NULL, - sha256_sum VARCHAR(255) NOT NULL, - repository_name VARCHAR(255) NOT NULL, + entry_id VARCHAR(255) NOT NULL, + create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + os_release TEXT NOT NULL, + sha256_sum VARCHAR(255) NOT NULL, + repository_name VARCHAR(255) NOT NULL, worker_id VARCHAR(255) REFERENCES workers (worker_id), batch_name VARCHAR(255), user_email TEXT, - commit_uri TEXT NOT NULL, - commit_hash TEXT NOT NULL + commit_uri TEXT NOT NULL, + commit_hash TEXT NOT NULL, + state NUMERIC NOT NULL ); CREATE TABLE batches ( name VARCHAR(255) PRIMARY KEY, batch_id VARCHAR(255) UNIQUE, - create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), - update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + update_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), seal_time TIMESTAMPTZ, - bugtracker_url TEXT + worker_id TEXT REFERENCES workers (worker_id) NOT NULL, + bugtracker_uri TEXT ); CREATE TABLE bugtracker_configs diff --git a/tools/mothership/proto/v1/entry.proto b/tools/mothership/proto/v1/entry.proto index 81d79e60..d550698c 100644 --- a/tools/mothership/proto/v1/entry.proto +++ b/tools/mothership/proto/v1/entry.proto @@ -76,4 +76,29 @@ message Entry { // Commit hash of the resulting import string commit_hash = 11 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Valid states of an entry. + enum State { + // Default value. This value is unused. + STATE_UNSPECIFIED = 0; + + // The entry is being archived. + ARCHIVING = 1; + + // The entry has been archived. + ARCHIVED = 2; + + // One or more errors occurred while archiving the entry. + // Usually related to patches failing to apply. + // The entry will be placed "on hold" until the admin API + // receives the "rescue" call + ON_HOLD = 3; + + // Error occurred while archiving the entry and an admin + // cancelled the entry. + // This entry CAN'T be rescued. + CANCELLED = 4; + } + // State of the entry. + State state = 12 [(google.api.field_behavior) = OUTPUT_ONLY]; } diff --git a/tools/mothership/ui/Entries.tsx b/tools/mothership/ui/Entries.tsx index dd97d902..7c1bdbc2 100644 --- a/tools/mothership/ui/Entries.tsx +++ b/tools/mothership/ui/Entries.tsx @@ -36,6 +36,7 @@ export const Entries = () => { { key: 'name', label: 'Entry Name' }, { key: 'entryId', label: 'Entry ID' }, { key: 'createTime', label: 'Created' }, + { key: 'state', label: 'State' }, ]} /> ); diff --git a/tools/mothership/ui/GetEntry.tsx b/tools/mothership/ui/GetEntry.tsx index b57af905..8b9f0586 100644 --- a/tools/mothership/ui/GetEntry.tsx +++ b/tools/mothership/ui/GetEntry.tsx @@ -75,6 +75,7 @@ export const GetEntry = () => { { key: 'workerId', label: 'Worker ID' }, { key: 'commitUri', label: 'Commit URI', linkToSelf: true }, { key: 'commitHash', label: 'Commit Hash' }, + { key: 'state', label: 'State' }, ]} /> diff --git a/tools/mothership/worker_server/entry.go b/tools/mothership/worker_server/entry.go index 9ced5628..5fed1cb7 100644 --- a/tools/mothership/worker_server/entry.go +++ b/tools/mothership/worker_server/entry.go @@ -20,14 +20,15 @@ import ( base "go.resf.org/peridot/base/go" mothership_db "go.resf.org/peridot/tools/mothership/db" mothershippb "go.resf.org/peridot/tools/mothership/pb" + "go.temporal.io/sdk/temporal" + "time" ) -func (w *Worker) CreateEntry(args *mothershippb.ProcessRPMArgs, importRpmRes *mothershippb.ImportRPMResponse) (*mothershippb.Entry, error) { +func (w *Worker) CreateEntry(args *mothershippb.ProcessRPMArgs) (*mothershippb.Entry, error) { req := args.Request internalReq := args.InternalRequest entry := mothership_db.Entry{ Name: base.NameGen("entries"), - EntryID: importRpmRes.Nevra, OSRelease: req.OsRelease, Sha256Sum: req.Checksum, RepositoryName: req.Repository, @@ -35,8 +36,7 @@ func (w *Worker) CreateEntry(args *mothershippb.ProcessRPMArgs, importRpmRes *mo String: internalReq.WorkerId, Valid: true, }, - CommitURI: importRpmRes.CommitUri, - CommitHash: importRpmRes.CommitHash, + State: mothershippb.Entry_ARCHIVING, } if req.Batch != "" { entry.BatchName = sql.NullString{ @@ -52,3 +52,50 @@ func (w *Worker) CreateEntry(args *mothershippb.ProcessRPMArgs, importRpmRes *mo return entry.ToPB(), nil } + +func (w *Worker) SetEntryState(entry string, state mothershippb.Entry_State, importRpmRes *mothershippb.ImportRPMResponse) (*mothershippb.Entry, error) { + ent, err := base.Q[mothership_db.Entry](w.db).F("name", entry).GetOrNil() + if err != nil { + return nil, errors.Wrap(err, "failed to get entry") + } + if ent == nil { + return nil, temporal.NewNonRetryableApplicationError( + "entry does not exist", + "entryDoesNotExist", + errors.New("entry does not exist"), + ) + } + + ent.State = state + if importRpmRes != nil { + ent.CommitURI = importRpmRes.CommitUri + ent.CommitHash = importRpmRes.CommitHash + } + + if err := base.Q[mothership_db.Entry](w.db).U(ent); err != nil { + return nil, errors.Wrap(err, "failed to update entry") + } + + return ent.ToPB(), nil +} + +func (w *Worker) SetWorkerLastCheckinTime(workerID string) error { + wrk, err := base.Q[mothership_db.Worker](w.db).F("worker_id", workerID).GetOrNil() + if err != nil { + return errors.Wrap(err, "failed to get worker") + } + if wrk == nil { + return temporal.NewNonRetryableApplicationError( + "worker does not exist", + "workerDoesNotExist", + errors.New("worker does not exist"), + ) + } + + wrk.LastCheckinTime = sql.NullTime{ + Time: time.Now(), + Valid: true, + } + + return base.Q[mothership_db.Worker](w.db).U(wrk) +}