mirror of
https://github.com/peridotbuild/peridot.git
synced 2024-10-08 08:54:12 +00:00
Misc changes
This commit is contained in:
parent
fab39c3021
commit
a77e224e85
@ -24,15 +24,18 @@ import Typography from '@mui/material/Typography';
|
||||
import Button from '@mui/material/Button';
|
||||
import { Theme } from '@mui/material/styles';
|
||||
|
||||
import DashboardIcon from '@mui/icons-material/Dashboard';
|
||||
import EngineeringIcon from '@mui/icons-material/Engineering';
|
||||
import ListAltIcon from '@mui/icons-material/ListAlt';
|
||||
import ImportExportIcon from '@mui/icons-material/ImportExport';
|
||||
|
||||
import { Workers } from './Workers';
|
||||
import { Drawer } from 'base/ts/mui/Drawer';
|
||||
|
||||
import { CreateWorker } from 'tools/mothership/admin/ui/CreateWorker';
|
||||
import { GetWorker } from 'tools/mothership/admin/ui/GetWorker';
|
||||
|
||||
import { Entries } from './Entries';
|
||||
import { GetEntry } from './GetEntry';
|
||||
import { Workers } from './Workers';
|
||||
|
||||
export const App = () => {
|
||||
return (
|
||||
<Box sx={{ display: 'flex' }}>
|
||||
@ -60,6 +63,7 @@ export const App = () => {
|
||||
{
|
||||
links: [
|
||||
{ text: 'Workers', href: '/workers', icon: <EngineeringIcon /> },
|
||||
{ text: 'Entries', href: '/entries', icon: <ImportExportIcon /> },
|
||||
],
|
||||
},
|
||||
]}
|
||||
@ -73,6 +77,10 @@ export const App = () => {
|
||||
<Route path="create" element={<CreateWorker />} />
|
||||
<Route path=":name" element={<GetWorker />} />
|
||||
</Route>
|
||||
<Route path="/entries">
|
||||
<Route index element={<Entries />} />
|
||||
<Route path=":name" element={<GetEntry />} />
|
||||
</Route>
|
||||
</Routes>
|
||||
</Box>
|
||||
</Box>
|
||||
|
@ -15,6 +15,13 @@
|
||||
*/
|
||||
|
||||
import * as mshipAdmin from 'bazel-bin/tools/mothership/proto/admin/v1/mshipadminpb_ts_proto_gen';
|
||||
import * as srpmArchiver from 'bazel-bin/tools/mothership/proto/v1/mothershippb_ts_proto_gen';
|
||||
|
||||
const archiverCfg = new srpmArchiver.Configuration({
|
||||
basePath: '/api',
|
||||
})
|
||||
|
||||
export const srpmArchiverApi = new srpmArchiver.SrpmArchiverApi(archiverCfg);
|
||||
|
||||
const cfg = new mshipAdmin.Configuration({
|
||||
basePath: '/admin/api',
|
||||
|
52
tools/mothership/db/batch.go
Normal file
52
tools/mothership/db/batch.go
Normal file
@ -0,0 +1,52 @@
|
||||
// Copyright 2023 Peridot Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mothership_db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
base "go.resf.org/peridot/base/go"
|
||||
mothershippb "go.resf.org/peridot/tools/mothership/pb"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Batch struct {
|
||||
PikaTableName string `pika:"batches"`
|
||||
PikaDefaultOrderBy string `pika:"-create_time"`
|
||||
|
||||
Name string `db:"name"`
|
||||
BatchID string `db:"batch_id"`
|
||||
WorkerID string `db:"worker_id"`
|
||||
CreateTime time.Time `db:"create_time" pika:"omitempty"`
|
||||
UpdateTime time.Time `db:"create_time" pika:"omitempty"`
|
||||
SealTime sql.NullTime `db:"create_time"`
|
||||
BugtrackerURI sql.NullString `db:"bugtracker_uri"`
|
||||
}
|
||||
|
||||
func (b *Batch) GetID() string {
|
||||
return b.Name
|
||||
}
|
||||
|
||||
func (b *Batch) ToPB() *mothershippb.Batch {
|
||||
return &mothershippb.Batch{
|
||||
Name: b.Name,
|
||||
BatchId: b.BatchID,
|
||||
WorkerId: b.WorkerID,
|
||||
CreateTime: timestamppb.New(b.CreateTime),
|
||||
UpdateTime: timestamppb.New(b.CreateTime),
|
||||
SealTime: base.SqlNullTime(b.SealTime),
|
||||
BugtrackerUri: base.SqlNullString(b.BugtrackerURI),
|
||||
}
|
||||
}
|
@ -35,16 +35,19 @@ message Batch {
|
||||
// Custom ID of the batch. Optional
|
||||
string batch_id = 2;
|
||||
|
||||
// Worker ID that created the batch.
|
||||
string worker_id = 3;
|
||||
|
||||
// Output only. Timestamp when the batch was created.
|
||||
google.protobuf.Timestamp create_time = 3 [(google.api.field_behavior) = OUTPUT_ONLY];
|
||||
google.protobuf.Timestamp create_time = 4 [(google.api.field_behavior) = OUTPUT_ONLY];
|
||||
|
||||
// Output only. Timestamp when the batch was last updated.
|
||||
google.protobuf.Timestamp update_time = 4 [(google.api.field_behavior) = OUTPUT_ONLY];
|
||||
google.protobuf.Timestamp update_time = 5 [(google.api.field_behavior) = OUTPUT_ONLY];
|
||||
|
||||
// Output only. Timestamp when the batch was sealed.
|
||||
// Batches are automatically sealed after an hour of inactivity.
|
||||
google.protobuf.Timestamp seal_time = 5 [(google.api.field_behavior) = OUTPUT_ONLY];
|
||||
google.protobuf.Timestamp seal_time = 6 [(google.api.field_behavior) = OUTPUT_ONLY];
|
||||
|
||||
// Output only. Bugtracker URI of the batch.
|
||||
google.protobuf.StringValue bugtracker_uri = 6 [(google.api.field_behavior) = OUTPUT_ONLY];
|
||||
google.protobuf.StringValue bugtracker_uri = 7 [(google.api.field_behavior) = OUTPUT_ONLY];
|
||||
}
|
||||
|
60
tools/mothership/rpc/batch.go
Normal file
60
tools/mothership/rpc/batch.go
Normal file
@ -0,0 +1,60 @@
|
||||
package mothership_rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"go.ciq.dev/pika"
|
||||
base "go.resf.org/peridot/base/go"
|
||||
mothership_db "go.resf.org/peridot/tools/mothership/db"
|
||||
mothershippb "go.resf.org/peridot/tools/mothership/pb"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) GetBatch(_ context.Context, req *mothershippb.GetBatchRequest) (*mothershippb.Batch, error) {
|
||||
batch, err := base.Q[mothership_db.Batch](s.db).F("name", req.Name).GetOrNil()
|
||||
if err != nil {
|
||||
base.LogErrorf("failed to get batch: %v", err)
|
||||
return nil, status.Error(codes.Internal, "failed to get batch")
|
||||
}
|
||||
|
||||
if batch == nil {
|
||||
return nil, status.Error(codes.NotFound, "batch not found")
|
||||
}
|
||||
|
||||
return batch.ToPB(), nil
|
||||
}
|
||||
|
||||
func (s *Server) ListBatches(_ context.Context, req *mothershippb.ListBatchesRequest) (*mothershippb.ListBatchesResponse, error) {
|
||||
aipOptions := pika.ProtoReflect(&mothershippb.Batch{})
|
||||
|
||||
page, nt, err := base.Q[mothership_db.Batch](s.db).GetPage(req, aipOptions)
|
||||
if err != nil {
|
||||
base.LogErrorf("failed to get batch page: %v", err)
|
||||
return nil, status.Error(codes.Internal, "failed to get batch page")
|
||||
}
|
||||
|
||||
return &mothershippb.ListBatchesResponse{
|
||||
Batches: base.SliceToPB[*mothershippb.Batch, *mothership_db.Batch](page),
|
||||
NextPageToken: nt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Server) CreateBatch(ctx context.Context, req *mothershippb.CreateBatchRequest) (*mothershippb.Batch, error) {
|
||||
worker, err := s.getWorkerIdentity(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
batch := &mothership_db.Batch{
|
||||
Name: base.NameGen("batches"),
|
||||
BatchID: req.BatchId,
|
||||
WorkerID: worker.WorkerID,
|
||||
}
|
||||
|
||||
if err := base.Q[mothership_db.Batch](s.db).Create(batch); err != nil {
|
||||
base.LogErrorf("failed to create batch: %v", err)
|
||||
return nil, status.Error(codes.Internal, "failed to create batch")
|
||||
}
|
||||
|
||||
return batch.ToPB(), nil
|
||||
}
|
129
tools/mothership/rpc/operation.go
Normal file
129
tools/mothership/rpc/operation.go
Normal file
@ -0,0 +1,129 @@
|
||||
package mothership_rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
base "go.resf.org/peridot/base/go"
|
||||
mothershippb "go.resf.org/peridot/tools/mothership/pb"
|
||||
v11 "go.temporal.io/api/enums/v1"
|
||||
"go.temporal.io/api/serviceerror"
|
||||
"go.temporal.io/api/workflowservice/v1"
|
||||
"google.golang.org/genproto/googleapis/longrunning"
|
||||
rpccode "google.golang.org/genproto/googleapis/rpc/code"
|
||||
rpcstatus "google.golang.org/genproto/googleapis/rpc/status"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func (s *Server) describeWorkflowToOperation(ctx context.Context, res *workflowservice.DescribeWorkflowExecutionResponse) (*longrunning.Operation, error) {
|
||||
if res.WorkflowExecutionInfo == nil {
|
||||
return nil, status.Error(codes.NotFound, "workflow not found")
|
||||
}
|
||||
if res.WorkflowExecutionInfo.Execution == nil {
|
||||
return nil, status.Error(codes.NotFound, "workflow not found")
|
||||
}
|
||||
|
||||
op := &longrunning.Operation{
|
||||
Name: res.WorkflowExecutionInfo.Execution.WorkflowId,
|
||||
}
|
||||
|
||||
// If the workflow is not running, we can mark the operation as done
|
||||
if res.WorkflowExecutionInfo.Status != v11.WORKFLOW_EXECUTION_STATUS_RUNNING {
|
||||
op.Done = true
|
||||
}
|
||||
|
||||
// Add metadata
|
||||
rpmMetadata := &mothershippb.ProcessRPMMetadata{
|
||||
StartTime: nil,
|
||||
EndTime: nil,
|
||||
}
|
||||
st := res.WorkflowExecutionInfo.GetStartTime()
|
||||
if st != nil {
|
||||
rpmMetadata.StartTime = timestamppb.New(*st)
|
||||
}
|
||||
|
||||
et := res.WorkflowExecutionInfo.GetCloseTime()
|
||||
if et != nil {
|
||||
rpmMetadata.EndTime = timestamppb.New(*et)
|
||||
}
|
||||
|
||||
rpmMetadataAny, err := anypb.New(rpmMetadata)
|
||||
if err != nil {
|
||||
return op, nil
|
||||
}
|
||||
op.Metadata = rpmMetadataAny
|
||||
|
||||
// If completed, add result
|
||||
// If failed, add error
|
||||
if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_COMPLETED {
|
||||
// Complete, we need to get the result using GetWorkflow
|
||||
run := s.temporal.GetWorkflow(ctx, op.Name, "")
|
||||
|
||||
var res mothershippb.ProcessRPMResponse
|
||||
if err := run.Get(ctx, &res); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resAny, err := anypb.New(&res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
op.Result = &longrunning.Operation_Response{Response: resAny}
|
||||
} else if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_FAILED {
|
||||
// Failed, we need to get the error using GetWorkflow
|
||||
run := s.temporal.GetWorkflow(ctx, op.Name, "")
|
||||
err := run.Get(ctx, nil)
|
||||
// No error so return with a generic error
|
||||
if err == nil {
|
||||
op.Result = &longrunning.Operation_Error{
|
||||
Error: &rpcstatus.Status{
|
||||
Code: int32(rpccode.Code_INTERNAL),
|
||||
Message: "workflow failed",
|
||||
},
|
||||
}
|
||||
return op, nil
|
||||
}
|
||||
|
||||
// Error, so return with the error
|
||||
op.Result = &longrunning.Operation_Error{
|
||||
Error: &rpcstatus.Status{
|
||||
Code: int32(rpccode.Code_FAILED_PRECONDITION),
|
||||
Message: err.Error(),
|
||||
},
|
||||
}
|
||||
} else if res.WorkflowExecutionInfo.Status == v11.WORKFLOW_EXECUTION_STATUS_CANCELED {
|
||||
// Error, so return with the error
|
||||
op.Result = &longrunning.Operation_Error{
|
||||
Error: &rpcstatus.Status{
|
||||
Code: int32(rpccode.Code_CANCELLED),
|
||||
Message: "workflow canceled",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return op, nil
|
||||
}
|
||||
|
||||
func (s *Server) getOperation(ctx context.Context, name string) (*longrunning.Operation, error) {
|
||||
res, err := s.temporal.DescribeWorkflowExecution(ctx, name, "")
|
||||
if err != nil {
|
||||
if _, ok := err.(*serviceerror.NotFound); ok {
|
||||
return nil, status.Error(codes.NotFound, "workflow not found")
|
||||
}
|
||||
|
||||
// Log error, but user doesn't need to know about it
|
||||
base.LogErrorf("failed to describe workflow: %v", err)
|
||||
return &longrunning.Operation{
|
||||
Name: name,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return s.describeWorkflowToOperation(ctx, res)
|
||||
}
|
||||
|
||||
func (s *Server) GetOperation(ctx context.Context, req *longrunning.GetOperationRequest) (*longrunning.Operation, error) {
|
||||
// Get from Temporal. We don't care about long term storage, so we don't
|
||||
// need to store the operation in the database.
|
||||
return s.getOperation(ctx, req.Name)
|
||||
}
|
29
tools/mothership/rpc/ping.go
Normal file
29
tools/mothership/rpc/ping.go
Normal file
@ -0,0 +1,29 @@
|
||||
package mothership_rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
base "go.resf.org/peridot/base/go"
|
||||
mothership_db "go.resf.org/peridot/tools/mothership/db"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (s *Server) WorkerPing(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) {
|
||||
worker, err := s.getWorkerIdentity(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
worker.LastCheckinTime = sql.NullTime{
|
||||
Time: time.Now(),
|
||||
Valid: true,
|
||||
}
|
||||
if err := base.Q[mothership_db.Worker](s.db).U(worker); err != nil {
|
||||
return nil, status.Error(codes.Internal, "failed to update worker")
|
||||
}
|
||||
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user