From a77e224e8560d9b9121c18af800121b4c0d2c0a7 Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Thu, 31 Aug 2023 10:21:08 +0200 Subject: [PATCH] Misc changes --- tools/mothership/admin/ui/App.tsx | 14 ++- tools/mothership/admin/ui/api.ts | 7 ++ tools/mothership/db/batch.go | 52 +++++++++++ tools/mothership/proto/v1/batch.proto | 11 ++- tools/mothership/rpc/batch.go | 60 ++++++++++++ tools/mothership/rpc/operation.go | 129 ++++++++++++++++++++++++++ tools/mothership/rpc/ping.go | 29 ++++++ 7 files changed, 295 insertions(+), 7 deletions(-) create mode 100644 tools/mothership/db/batch.go create mode 100644 tools/mothership/rpc/batch.go create mode 100644 tools/mothership/rpc/operation.go create mode 100644 tools/mothership/rpc/ping.go diff --git a/tools/mothership/admin/ui/App.tsx b/tools/mothership/admin/ui/App.tsx index dd335548..946e215b 100644 --- a/tools/mothership/admin/ui/App.tsx +++ b/tools/mothership/admin/ui/App.tsx @@ -24,15 +24,18 @@ import Typography from '@mui/material/Typography'; import Button from '@mui/material/Button'; import { Theme } from '@mui/material/styles'; -import DashboardIcon from '@mui/icons-material/Dashboard'; import EngineeringIcon from '@mui/icons-material/Engineering'; -import ListAltIcon from '@mui/icons-material/ListAlt'; +import ImportExportIcon from '@mui/icons-material/ImportExport'; -import { Workers } from './Workers'; import { Drawer } from 'base/ts/mui/Drawer'; + import { CreateWorker } from 'tools/mothership/admin/ui/CreateWorker'; import { GetWorker } from 'tools/mothership/admin/ui/GetWorker'; +import { Entries } from './Entries'; +import { GetEntry } from './GetEntry'; +import { Workers } from './Workers'; + export const App = () => { return ( @@ -60,6 +63,7 @@ export const App = () => { { links: [ { text: 'Workers', href: '/workers', icon: }, + { text: 'Entries', href: '/entries', icon: }, ], }, ]} @@ -73,6 +77,10 @@ export const App = () => { } /> } /> + + } /> + } /> + diff --git a/tools/mothership/admin/ui/api.ts b/tools/mothership/admin/ui/api.ts index 78533ad5..03f633ae 100644 --- a/tools/mothership/admin/ui/api.ts +++ b/tools/mothership/admin/ui/api.ts @@ -15,6 +15,13 @@ */ import * as mshipAdmin from 'bazel-bin/tools/mothership/proto/admin/v1/mshipadminpb_ts_proto_gen'; +import * as srpmArchiver from 'bazel-bin/tools/mothership/proto/v1/mothershippb_ts_proto_gen'; + +const archiverCfg = new srpmArchiver.Configuration({ + basePath: '/api', +}) + +export const srpmArchiverApi = new srpmArchiver.SrpmArchiverApi(archiverCfg); const cfg = new mshipAdmin.Configuration({ basePath: '/admin/api', diff --git a/tools/mothership/db/batch.go b/tools/mothership/db/batch.go new file mode 100644 index 00000000..4857bc69 --- /dev/null +++ b/tools/mothership/db/batch.go @@ -0,0 +1,52 @@ +// Copyright 2023 Peridot Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mothership_db + +import ( + "database/sql" + base "go.resf.org/peridot/base/go" + mothershippb "go.resf.org/peridot/tools/mothership/pb" + "google.golang.org/protobuf/types/known/timestamppb" + "time" +) + +type Batch struct { + PikaTableName string `pika:"batches"` + PikaDefaultOrderBy string `pika:"-create_time"` + + Name string `db:"name"` + BatchID string `db:"batch_id"` + WorkerID string `db:"worker_id"` + CreateTime time.Time `db:"create_time" pika:"omitempty"` + UpdateTime time.Time `db:"create_time" pika:"omitempty"` + SealTime sql.NullTime `db:"create_time"` + BugtrackerURI sql.NullString `db:"bugtracker_uri"` +} + +func (b *Batch) GetID() string { + return b.Name +} + +func (b *Batch) ToPB() *mothershippb.Batch { + return &mothershippb.Batch{ + Name: b.Name, + BatchId: b.BatchID, + WorkerId: b.WorkerID, + CreateTime: timestamppb.New(b.CreateTime), + UpdateTime: timestamppb.New(b.CreateTime), + SealTime: base.SqlNullTime(b.SealTime), + BugtrackerUri: base.SqlNullString(b.BugtrackerURI), + } +} diff --git a/tools/mothership/proto/v1/batch.proto b/tools/mothership/proto/v1/batch.proto index f3485ce8..f6d7dd2b 100644 --- a/tools/mothership/proto/v1/batch.proto +++ b/tools/mothership/proto/v1/batch.proto @@ -35,16 +35,19 @@ message Batch { // Custom ID of the batch. Optional string batch_id = 2; + // Worker ID that created the batch. + string worker_id = 3; + // Output only. Timestamp when the batch was created. - google.protobuf.Timestamp create_time = 3 [(google.api.field_behavior) = OUTPUT_ONLY]; + google.protobuf.Timestamp create_time = 4 [(google.api.field_behavior) = OUTPUT_ONLY]; // Output only. Timestamp when the batch was last updated. - google.protobuf.Timestamp update_time = 4 [(google.api.field_behavior) = OUTPUT_ONLY]; + google.protobuf.Timestamp update_time = 5 [(google.api.field_behavior) = OUTPUT_ONLY]; // Output only. Timestamp when the batch was sealed. // Batches are automatically sealed after an hour of inactivity. - google.protobuf.Timestamp seal_time = 5 [(google.api.field_behavior) = OUTPUT_ONLY]; + google.protobuf.Timestamp seal_time = 6 [(google.api.field_behavior) = OUTPUT_ONLY]; // Output only. Bugtracker URI of the batch. - google.protobuf.StringValue bugtracker_uri = 6 [(google.api.field_behavior) = OUTPUT_ONLY]; + google.protobuf.StringValue bugtracker_uri = 7 [(google.api.field_behavior) = OUTPUT_ONLY]; } diff --git a/tools/mothership/rpc/batch.go b/tools/mothership/rpc/batch.go new file mode 100644 index 00000000..c83dff5e --- /dev/null +++ b/tools/mothership/rpc/batch.go @@ -0,0 +1,60 @@ +package mothership_rpc + +import ( + "context" + "go.ciq.dev/pika" + base "go.resf.org/peridot/base/go" + mothership_db "go.resf.org/peridot/tools/mothership/db" + mothershippb "go.resf.org/peridot/tools/mothership/pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Server) GetBatch(_ context.Context, req *mothershippb.GetBatchRequest) (*mothershippb.Batch, error) { + batch, err := base.Q[mothership_db.Batch](s.db).F("name", req.Name).GetOrNil() + if err != nil { + base.LogErrorf("failed to get batch: %v", err) + return nil, status.Error(codes.Internal, "failed to get batch") + } + + if batch == nil { + return nil, status.Error(codes.NotFound, "batch not found") + } + + return batch.ToPB(), nil +} + +func (s *Server) ListBatches(_ context.Context, req *mothershippb.ListBatchesRequest) (*mothershippb.ListBatchesResponse, error) { + aipOptions := pika.ProtoReflect(&mothershippb.Batch{}) + + page, nt, err := base.Q[mothership_db.Batch](s.db).GetPage(req, aipOptions) + if err != nil { + base.LogErrorf("failed to get batch page: %v", err) + return nil, status.Error(codes.Internal, "failed to get batch page") + } + + return &mothershippb.ListBatchesResponse{ + Batches: base.SliceToPB[*mothershippb.Batch, *mothership_db.Batch](page), + NextPageToken: nt, + }, nil +} + +func (s *Server) CreateBatch(ctx context.Context, req *mothershippb.CreateBatchRequest) (*mothershippb.Batch, error) { + worker, err := s.getWorkerIdentity(ctx) + if err != nil { + return nil, err + } + + batch := &mothership_db.Batch{ + Name: base.NameGen("batches"), + BatchID: req.BatchId, + WorkerID: worker.WorkerID, + } + + if err := base.Q[mothership_db.Batch](s.db).Create(batch); err != nil { + base.LogErrorf("failed to create batch: %v", err) + return nil, status.Error(codes.Internal, "failed to create batch") + } + + return batch.ToPB(), nil +} diff --git a/tools/mothership/rpc/operation.go b/tools/mothership/rpc/operation.go new file mode 100644 index 00000000..9b595f1b --- /dev/null +++ b/tools/mothership/rpc/operation.go @@ -0,0 +1,129 @@ +package mothership_rpc + +import ( + "context" + base "go.resf.org/peridot/base/go" + mothershippb "go.resf.org/peridot/tools/mothership/pb" + v11 "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" + "google.golang.org/genproto/googleapis/longrunning" + rpccode "google.golang.org/genproto/googleapis/rpc/code" + rpcstatus "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func (s *Server) describeWorkflowToOperation(ctx context.Context, res *workflowservice.DescribeWorkflowExecutionResponse) (*longrunning.Operation, error) { + if res.WorkflowExecutionInfo == nil { + return nil, status.Error(codes.NotFound, "workflow not found") + } + if res.WorkflowExecutionInfo.Execution == nil { + return nil, status.Error(codes.NotFound, "workflow not found") + } + + op := &longrunning.Operation{ + Name: res.WorkflowExecutionInfo.Execution.WorkflowId, + } + + // If the workflow is not running, we can mark the operation as done + if res.WorkflowExecutionInfo.Status != v11.WORKFLOW_EXECUTION_STATUS_RUNNING { + op.Done = true + } + + // Add metadata + rpmMetadata := &mothershippb.ProcessRPMMetadata{ + StartTime: nil, + EndTime: nil, + } + st := res.WorkflowExecutionInfo.GetStartTime() + if st != nil { + rpmMetadata.StartTime = timestamppb.New(*st) + } + + et := res.WorkflowExecutionInfo.GetCloseTime() + if et != nil { + rpmMetadata.EndTime = timestamppb.New(*et) + } + + rpmMetadataAny, err := anypb.New(rpmMetadata) + if err != nil { + return op, nil + } + op.Metadata = rpmMetadataAny + + // If completed, add result + // If failed, add error + if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_COMPLETED { + // Complete, we need to get the result using GetWorkflow + run := s.temporal.GetWorkflow(ctx, op.Name, "") + + var res mothershippb.ProcessRPMResponse + if err := run.Get(ctx, &res); err != nil { + return nil, err + } + + resAny, err := anypb.New(&res) + if err != nil { + return nil, err + } + op.Result = &longrunning.Operation_Response{Response: resAny} + } else if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_FAILED { + // Failed, we need to get the error using GetWorkflow + run := s.temporal.GetWorkflow(ctx, op.Name, "") + err := run.Get(ctx, nil) + // No error so return with a generic error + if err == nil { + op.Result = &longrunning.Operation_Error{ + Error: &rpcstatus.Status{ + Code: int32(rpccode.Code_INTERNAL), + Message: "workflow failed", + }, + } + return op, nil + } + + // Error, so return with the error + op.Result = &longrunning.Operation_Error{ + Error: &rpcstatus.Status{ + Code: int32(rpccode.Code_FAILED_PRECONDITION), + Message: err.Error(), + }, + } + } else if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_CANCELED { + // Error, so return with the error + op.Result = &longrunning.Operation_Error{ + Error: &rpcstatus.Status{ + Code: int32(rpccode.Code_CANCELLED), + Message: "workflow canceled", + }, + } + } + + return op, nil +} + +func (s *Server) getOperation(ctx context.Context, name string) (*longrunning.Operation, error) { + res, err := s.temporal.DescribeWorkflowExecution(ctx, name, "") + if err != nil { + if _, ok := err.(*serviceerror.NotFound); ok { + return nil, status.Error(codes.NotFound, "workflow not found") + } + + // Log error, but user doesn't need to know about it + base.LogErrorf("failed to describe workflow: %v", err) + return &longrunning.Operation{ + Name: name, + }, nil + } + + return s.describeWorkflowToOperation(ctx, res) +} + +func (s *Server) GetOperation(ctx context.Context, req *longrunning.GetOperationRequest) (*longrunning.Operation, error) { + // Get from Temporal. We don't care about long term storage, so we don't + // need to store the operation in the database. + return s.getOperation(ctx, req.Name) +} diff --git a/tools/mothership/rpc/ping.go b/tools/mothership/rpc/ping.go new file mode 100644 index 00000000..ece28e09 --- /dev/null +++ b/tools/mothership/rpc/ping.go @@ -0,0 +1,29 @@ +package mothership_rpc + +import ( + "context" + "database/sql" + base "go.resf.org/peridot/base/go" + mothership_db "go.resf.org/peridot/tools/mothership/db" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" + "time" +) + +func (s *Server) WorkerPing(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) { + worker, err := s.getWorkerIdentity(ctx) + if err != nil { + return nil, err + } + + worker.LastCheckinTime = sql.NullTime{ + Time: time.Now(), + Valid: true, + } + if err := base.Q[mothership_db.Worker](s.db).U(worker); err != nil { + return nil, status.Error(codes.Internal, "failed to update worker") + } + + return &emptypb.Empty{}, nil +}