diff --git a/peridot/builder/v1/workflow/rpmimport.go b/peridot/builder/v1/workflow/rpmimport.go index adc37d9..e93c965 100644 --- a/peridot/builder/v1/workflow/rpmimport.go +++ b/peridot/builder/v1/workflow/rpmimport.go @@ -179,26 +179,8 @@ func (c *Controller) RpmLookasideBatchImportWorkflow(ctx workflow.Context, req * defer cleanupWorker() taskID := task.ID.String() - var importResults []*RpmImportActivityTaskStage1 - var taskIDs []string - taskIDBuildMap := map[string]*RpmImportActivityTaskStage1{} + var stage1 *RpmImportActivityTaskStage1 for _, blob := range req.LookasideBlobs { - var archTask models.Task - archTaskEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { - newTask, err := c.db.CreateTask(nil, "noarch", peridotpb.TaskType_TASK_TYPE_RPM_IMPORT, &req.ProjectId, &taskID) - if err != nil { - return &models.Task{} - } - - _ = c.db.SetTaskStatus(newTask.ID.String(), peridotpb.TaskStatus_TASK_STATUS_RUNNING) - return newTask - }) - err := archTaskEffect.Get(&archTask) - if err != nil || !archTask.ProjectId.Valid { - return nil, fmt.Errorf("failed to create rpm task: %s", err) - } - taskIDs = append(taskIDs, archTask.ID.String()) - var importRes RpmImportActivityTaskStage1 importCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: time.Hour, @@ -213,42 +195,40 @@ func (c *Controller) RpmLookasideBatchImportWorkflow(ctx workflow.Context, req * Rpms: blob, ForceOverride: req.ForceOverride, } - err = workflow.ExecuteActivity(importCtx, c.RpmImportActivity, blobReq, archTask.ID.String(), true).Get(ctx, &importRes) + err = workflow.ExecuteActivity(importCtx, c.RpmImportActivity, blobReq, task.ID.String(), true, stage1).Get(ctx, &importRes) if err != nil { setActivityError(errorDetails, err) return nil, err } - importResults = append(importResults, &importRes) - taskIDBuildMap[archTask.ID.String()] = &importRes + if stage1 == nil { + stage1 = &importRes + } } var res []*RpmImportUploadWrapper - for _, importTaskID := range taskIDs { - uploadArchCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - ScheduleToStartTimeout: 12 * time.Hour, - StartToCloseTimeout: 24 * time.Hour, - HeartbeatTimeout: 2 * time.Minute, - TaskQueue: importTaskQueue, - }) + uploadArchCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToStartTimeout: 12 * time.Hour, + StartToCloseTimeout: 24 * time.Hour, + HeartbeatTimeout: 2 * time.Minute, + TaskQueue: importTaskQueue, + }) - var interimRes []*UploadActivityResult - err = workflow.ExecuteActivity(uploadArchCtx, c.UploadArchActivity, req.ProjectId, importTaskID).Get(ctx, &interimRes) - if err != nil { - setActivityError(errorDetails, err) - return nil, err - } - for _, ires := range interimRes { - res = append(res, &RpmImportUploadWrapper{ - Upload: ires, - TaskID: importTaskID, - }) - } + var interimRes []*UploadActivityResult + err = workflow.ExecuteActivity(uploadArchCtx, c.UploadArchActivity, req.ProjectId, task.ID.String()).Get(ctx, &interimRes) + if err != nil { + setActivityError(errorDetails, err) + return nil, err + } + for _, ires := range interimRes { + res = append(res, &RpmImportUploadWrapper{ + Upload: ires, + TaskID: task.ID.String(), + }) } for _, result := range res { - stage1 := taskIDBuildMap[result.TaskID] - if stage1 == nil { - return nil, fmt.Errorf("failed to find task %s", result.TaskID) + if result.Upload.Skip { + continue } err = c.db.AttachTaskToBuild(stage1.Build.ID.String(), result.Upload.Subtask.ID.String()) if err != nil { @@ -256,9 +236,6 @@ func (c *Controller) RpmLookasideBatchImportWorkflow(ctx workflow.Context, req * setInternalError(errorDetails, err) return nil, err } - if result.Upload.Skip { - continue - } } yumrepoCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ @@ -266,14 +243,11 @@ func (c *Controller) RpmLookasideBatchImportWorkflow(ctx workflow.Context, req * }) updateRepoRequest := &UpdateRepoRequest{ ProjectID: req.ProjectId, - BuildIDs: []string{}, + BuildIDs: []string{stage1.Build.ID.String()}, Delete: false, TaskID: &taskID, NoDeletePrevious: true, } - for _, importRes := range importResults { - updateRepoRequest.BuildIDs = append(updateRepoRequest.BuildIDs, importRes.Build.ID.String()) - } updateRepoTask := &yumrepofspb.UpdateRepoTask{} err = workflow.ExecuteChildWorkflow(yumrepoCtx, c.RepoUpdaterWorkflow, updateRepoRequest).Get(yumrepoCtx, updateRepoTask) if err != nil { @@ -287,7 +261,7 @@ func (c *Controller) RpmLookasideBatchImportWorkflow(ctx workflow.Context, req * return &ret, nil } -func (c *Controller) RpmImportActivity(ctx context.Context, req *peridotpb.RpmImportRequest, taskID string, setTaskStatus bool) (*RpmImportActivityTaskStage1, error) { +func (c *Controller) RpmImportActivity(ctx context.Context, req *peridotpb.RpmImportRequest, taskID string, setTaskStatus bool, stage1 *RpmImportActivityTaskStage1) (*RpmImportActivityTaskStage1, error) { go func() { for { activity.RecordHeartbeat(ctx) @@ -425,33 +399,38 @@ func (c *Controller) RpmImportActivity(ctx context.Context, req *peridotpb.RpmIm return nil, status.Error(codes.Internal, "could not set task metadata") } - var packageVersionId string - packageVersionId, err = tx.GetPackageVersionId(pkg.ID.String(), nvrMatch[2], nvrMatch[3]) - if err != nil { - if err == sql.ErrNoRows { - packageVersionId, err = tx.CreatePackageVersion(pkg.ID.String(), nvrMatch[2], nvrMatch[3]) - if err != nil { - err = status.Errorf(codes.Internal, "could not create package version: %v", err) + var build *models.Build + if stage1 == nil { + var packageVersionId string + packageVersionId, err = tx.GetPackageVersionId(pkg.ID.String(), nvrMatch[2], nvrMatch[3]) + if err != nil { + if err == sql.ErrNoRows { + packageVersionId, err = tx.CreatePackageVersion(pkg.ID.String(), nvrMatch[2], nvrMatch[3]) + if err != nil { + err = status.Errorf(codes.Internal, "could not create package version: %v", err) + return nil, err + } + } else { + err = status.Errorf(codes.Internal, "could not get package version id: %v", err) return nil, err } - } else { - err = status.Errorf(codes.Internal, "could not get package version id: %v", err) + } + + // todo(mustafa): Add published check, as well as limitations for overriding existing versions + // TODO URGENT: Don't allow nondeterministic behavior regarding versions + err = tx.AttachPackageVersion(req.ProjectId, pkg.ID.String(), packageVersionId, false) + if err != nil { + err = status.Errorf(codes.Internal, "could not attach package version: %v", err) return nil, err } - } - // todo(mustafa): Add published check, as well as limitations for overriding existing versions - // TODO URGENT: Don't allow nondeterministic behavior regarding versions - err = tx.AttachPackageVersion(req.ProjectId, pkg.ID.String(), packageVersionId, false) - if err != nil { - err = status.Errorf(codes.Internal, "could not attach package version: %v", err) - return nil, err - } - - build, err := tx.CreateBuild(pkg.ID.String(), packageVersionId, taskID, req.ProjectId) - if err != nil { - err = status.Errorf(codes.Internal, "could not create build") - return nil, err + build, err = tx.CreateBuild(pkg.ID.String(), packageVersionId, taskID, req.ProjectId) + if err != nil { + err = status.Errorf(codes.Internal, "could not create build") + return nil, err + } + } else { + build = stage1.Build } targetDir := filepath.Join(rpmbuild.GetCloneDirectory(), "RPMS")