Rpm-import is now simpler

This commit is contained in:
Mustafa Gezen 2022-11-04 03:26:42 +01:00
parent 8d2116dfeb
commit 0eeb8395e4
Signed by untrusted user who does not match committer: mustafa
GPG Key ID: DCDF010D946438C1
1 changed files with 53 additions and 74 deletions

View File

@ -179,26 +179,8 @@ func (c *Controller) RpmLookasideBatchImportWorkflow(ctx workflow.Context, req *
defer cleanupWorker() defer cleanupWorker()
taskID := task.ID.String() taskID := task.ID.String()
var importResults []*RpmImportActivityTaskStage1 var stage1 *RpmImportActivityTaskStage1
var taskIDs []string
taskIDBuildMap := map[string]*RpmImportActivityTaskStage1{}
for _, blob := range req.LookasideBlobs { for _, blob := range req.LookasideBlobs {
var archTask models.Task
archTaskEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
newTask, err := c.db.CreateTask(nil, "noarch", peridotpb.TaskType_TASK_TYPE_RPM_IMPORT, &req.ProjectId, &taskID)
if err != nil {
return &models.Task{}
}
_ = c.db.SetTaskStatus(newTask.ID.String(), peridotpb.TaskStatus_TASK_STATUS_RUNNING)
return newTask
})
err := archTaskEffect.Get(&archTask)
if err != nil || !archTask.ProjectId.Valid {
return nil, fmt.Errorf("failed to create rpm task: %s", err)
}
taskIDs = append(taskIDs, archTask.ID.String())
var importRes RpmImportActivityTaskStage1 var importRes RpmImportActivityTaskStage1
importCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ importCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Hour, StartToCloseTimeout: time.Hour,
@ -213,42 +195,40 @@ func (c *Controller) RpmLookasideBatchImportWorkflow(ctx workflow.Context, req *
Rpms: blob, Rpms: blob,
ForceOverride: req.ForceOverride, ForceOverride: req.ForceOverride,
} }
err = workflow.ExecuteActivity(importCtx, c.RpmImportActivity, blobReq, archTask.ID.String(), true).Get(ctx, &importRes) err = workflow.ExecuteActivity(importCtx, c.RpmImportActivity, blobReq, task.ID.String(), true, stage1).Get(ctx, &importRes)
if err != nil { if err != nil {
setActivityError(errorDetails, err) setActivityError(errorDetails, err)
return nil, err return nil, err
} }
importResults = append(importResults, &importRes) if stage1 == nil {
taskIDBuildMap[archTask.ID.String()] = &importRes stage1 = &importRes
}
} }
var res []*RpmImportUploadWrapper var res []*RpmImportUploadWrapper
for _, importTaskID := range taskIDs { uploadArchCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
uploadArchCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ ScheduleToStartTimeout: 12 * time.Hour,
ScheduleToStartTimeout: 12 * time.Hour, StartToCloseTimeout: 24 * time.Hour,
StartToCloseTimeout: 24 * time.Hour, HeartbeatTimeout: 2 * time.Minute,
HeartbeatTimeout: 2 * time.Minute, TaskQueue: importTaskQueue,
TaskQueue: importTaskQueue, })
})
var interimRes []*UploadActivityResult var interimRes []*UploadActivityResult
err = workflow.ExecuteActivity(uploadArchCtx, c.UploadArchActivity, req.ProjectId, importTaskID).Get(ctx, &interimRes) err = workflow.ExecuteActivity(uploadArchCtx, c.UploadArchActivity, req.ProjectId, task.ID.String()).Get(ctx, &interimRes)
if err != nil { if err != nil {
setActivityError(errorDetails, err) setActivityError(errorDetails, err)
return nil, err return nil, err
} }
for _, ires := range interimRes { for _, ires := range interimRes {
res = append(res, &RpmImportUploadWrapper{ res = append(res, &RpmImportUploadWrapper{
Upload: ires, Upload: ires,
TaskID: importTaskID, TaskID: task.ID.String(),
}) })
}
} }
for _, result := range res { for _, result := range res {
stage1 := taskIDBuildMap[result.TaskID] if result.Upload.Skip {
if stage1 == nil { continue
return nil, fmt.Errorf("failed to find task %s", result.TaskID)
} }
err = c.db.AttachTaskToBuild(stage1.Build.ID.String(), result.Upload.Subtask.ID.String()) err = c.db.AttachTaskToBuild(stage1.Build.ID.String(), result.Upload.Subtask.ID.String())
if err != nil { if err != nil {
@ -256,9 +236,6 @@ func (c *Controller) RpmLookasideBatchImportWorkflow(ctx workflow.Context, req *
setInternalError(errorDetails, err) setInternalError(errorDetails, err)
return nil, err return nil, err
} }
if result.Upload.Skip {
continue
}
} }
yumrepoCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ yumrepoCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
@ -266,14 +243,11 @@ func (c *Controller) RpmLookasideBatchImportWorkflow(ctx workflow.Context, req *
}) })
updateRepoRequest := &UpdateRepoRequest{ updateRepoRequest := &UpdateRepoRequest{
ProjectID: req.ProjectId, ProjectID: req.ProjectId,
BuildIDs: []string{}, BuildIDs: []string{stage1.Build.ID.String()},
Delete: false, Delete: false,
TaskID: &taskID, TaskID: &taskID,
NoDeletePrevious: true, NoDeletePrevious: true,
} }
for _, importRes := range importResults {
updateRepoRequest.BuildIDs = append(updateRepoRequest.BuildIDs, importRes.Build.ID.String())
}
updateRepoTask := &yumrepofspb.UpdateRepoTask{} updateRepoTask := &yumrepofspb.UpdateRepoTask{}
err = workflow.ExecuteChildWorkflow(yumrepoCtx, c.RepoUpdaterWorkflow, updateRepoRequest).Get(yumrepoCtx, updateRepoTask) err = workflow.ExecuteChildWorkflow(yumrepoCtx, c.RepoUpdaterWorkflow, updateRepoRequest).Get(yumrepoCtx, updateRepoTask)
if err != nil { if err != nil {
@ -287,7 +261,7 @@ func (c *Controller) RpmLookasideBatchImportWorkflow(ctx workflow.Context, req *
return &ret, nil return &ret, nil
} }
func (c *Controller) RpmImportActivity(ctx context.Context, req *peridotpb.RpmImportRequest, taskID string, setTaskStatus bool) (*RpmImportActivityTaskStage1, error) { func (c *Controller) RpmImportActivity(ctx context.Context, req *peridotpb.RpmImportRequest, taskID string, setTaskStatus bool, stage1 *RpmImportActivityTaskStage1) (*RpmImportActivityTaskStage1, error) {
go func() { go func() {
for { for {
activity.RecordHeartbeat(ctx) activity.RecordHeartbeat(ctx)
@ -425,33 +399,38 @@ func (c *Controller) RpmImportActivity(ctx context.Context, req *peridotpb.RpmIm
return nil, status.Error(codes.Internal, "could not set task metadata") return nil, status.Error(codes.Internal, "could not set task metadata")
} }
var packageVersionId string var build *models.Build
packageVersionId, err = tx.GetPackageVersionId(pkg.ID.String(), nvrMatch[2], nvrMatch[3]) if stage1 == nil {
if err != nil { var packageVersionId string
if err == sql.ErrNoRows { packageVersionId, err = tx.GetPackageVersionId(pkg.ID.String(), nvrMatch[2], nvrMatch[3])
packageVersionId, err = tx.CreatePackageVersion(pkg.ID.String(), nvrMatch[2], nvrMatch[3]) if err != nil {
if err != nil { if err == sql.ErrNoRows {
err = status.Errorf(codes.Internal, "could not create package version: %v", err) packageVersionId, err = tx.CreatePackageVersion(pkg.ID.String(), nvrMatch[2], nvrMatch[3])
if err != nil {
err = status.Errorf(codes.Internal, "could not create package version: %v", err)
return nil, err
}
} else {
err = status.Errorf(codes.Internal, "could not get package version id: %v", err)
return nil, err return nil, err
} }
} else { }
err = status.Errorf(codes.Internal, "could not get package version id: %v", err)
// todo(mustafa): Add published check, as well as limitations for overriding existing versions
// TODO URGENT: Don't allow nondeterministic behavior regarding versions
err = tx.AttachPackageVersion(req.ProjectId, pkg.ID.String(), packageVersionId, false)
if err != nil {
err = status.Errorf(codes.Internal, "could not attach package version: %v", err)
return nil, err return nil, err
} }
}
// todo(mustafa): Add published check, as well as limitations for overriding existing versions build, err = tx.CreateBuild(pkg.ID.String(), packageVersionId, taskID, req.ProjectId)
// TODO URGENT: Don't allow nondeterministic behavior regarding versions if err != nil {
err = tx.AttachPackageVersion(req.ProjectId, pkg.ID.String(), packageVersionId, false) err = status.Errorf(codes.Internal, "could not create build")
if err != nil { return nil, err
err = status.Errorf(codes.Internal, "could not attach package version: %v", err) }
return nil, err } else {
} build = stage1.Build
build, err := tx.CreateBuild(pkg.ID.String(), packageVersionId, taskID, req.ProjectId)
if err != nil {
err = status.Errorf(codes.Internal, "could not create build")
return nil, err
} }
targetDir := filepath.Join(rpmbuild.GetCloneDirectory(), "RPMS") targetDir := filepath.Join(rpmbuild.GetCloneDirectory(), "RPMS")