mirror of
https://github.com/rocky-linux/peridot.git
synced 2024-12-18 08:58:30 +00:00
Wait for rpm import to finish in CLI
Signed-off-by: Mustafa Gezen <mustafa@ctrliq.com>
This commit is contained in:
parent
7370e487f2
commit
d3e5d9ea92
@ -8,7 +8,6 @@ go_library(
|
|||||||
"hashed_repositories.go",
|
"hashed_repositories.go",
|
||||||
"import.go",
|
"import.go",
|
||||||
"infrastructure.go",
|
"infrastructure.go",
|
||||||
"lookaside.go",
|
|
||||||
"module.go",
|
"module.go",
|
||||||
"rpmimport.go",
|
"rpmimport.go",
|
||||||
"srpm.go",
|
"srpm.go",
|
||||||
|
@ -1,128 +0,0 @@
|
|||||||
// Copyright (c) All respective contributors to the Peridot Project. All rights reserved.
|
|
||||||
// Copyright (c) 2021-2022 Rocky Enterprise Software Foundation, Inc. All rights reserved.
|
|
||||||
// Copyright (c) 2021-2022 Ctrl IQ, Inc. All rights reserved.
|
|
||||||
//
|
|
||||||
// Redistribution and use in source and binary forms, with or without
|
|
||||||
// modification, are permitted provided that the following conditions are met:
|
|
||||||
//
|
|
||||||
// 1. Redistributions of source code must retain the above copyright notice,
|
|
||||||
// this list of conditions and the following disclaimer.
|
|
||||||
//
|
|
||||||
// 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
||||||
// this list of conditions and the following disclaimer in the documentation
|
|
||||||
// and/or other materials provided with the distribution.
|
|
||||||
//
|
|
||||||
// 3. Neither the name of the copyright holder nor the names of its contributors
|
|
||||||
// may be used to endorse or promote products derived from this software without
|
|
||||||
// specific prior written permission.
|
|
||||||
//
|
|
||||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
||||||
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
||||||
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
||||||
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
|
||||||
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
||||||
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
||||||
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
||||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
||||||
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
||||||
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
||||||
// POSSIBILITY OF SUCH DAMAGE.
|
|
||||||
|
|
||||||
package workflow
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/sha256"
|
|
||||||
"encoding/base64"
|
|
||||||
"encoding/hex"
|
|
||||||
"go.temporal.io/sdk/activity"
|
|
||||||
"go.temporal.io/sdk/temporal"
|
|
||||||
"go.temporal.io/sdk/workflow"
|
|
||||||
"peridot.resf.org/peridot/db/models"
|
|
||||||
peridotpb "peridot.resf.org/peridot/pb"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (c *Controller) LookasideFileUploadWorkflow(ctx workflow.Context, req *peridotpb.LookasideFileUploadRequest, task *models.Task) (*peridotpb.LookasideFileUploadTask, error) {
|
|
||||||
ret := &peridotpb.LookasideFileUploadTask{}
|
|
||||||
deferTask, errorDetails, err := c.commonCreateTask(task, ret)
|
|
||||||
defer deferTask()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
task.Status = peridotpb.TaskStatus_TASK_STATUS_FAILED
|
|
||||||
|
|
||||||
uploadTaskQueue, cleanupWorker, err := c.provisionWorker(ctx, &ProvisionWorkerRequest{
|
|
||||||
TaskId: task.ID.String(),
|
|
||||||
ParentTaskId: task.ParentTaskId,
|
|
||||||
Purpose: "lookaside",
|
|
||||||
Arch: "noarch",
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
setInternalError(errorDetails, err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer cleanupWorker()
|
|
||||||
|
|
||||||
uploadCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
|
|
||||||
StartToCloseTimeout: time.Hour,
|
|
||||||
HeartbeatTimeout: 20 * time.Second,
|
|
||||||
TaskQueue: uploadTaskQueue,
|
|
||||||
RetryPolicy: &temporal.RetryPolicy{
|
|
||||||
MaximumAttempts: 1,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
err = workflow.ExecuteActivity(uploadCtx, c.LookasideFileUploadActivity, req, task.ID.String()).Get(ctx, ret)
|
|
||||||
if err != nil {
|
|
||||||
setActivityError(errorDetails, err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
task.Status = peridotpb.TaskStatus_TASK_STATUS_SUCCEEDED
|
|
||||||
|
|
||||||
return ret, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Controller) LookasideFileUploadActivity(ctx context.Context, req *peridotpb.LookasideFileUploadRequest, taskID string) (*peridotpb.LookasideFileUploadTask, error) {
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
activity.RecordHeartbeat(ctx)
|
|
||||||
time.Sleep(4 * time.Second)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
base64DecodedFile, err := base64.StdEncoding.DecodeString(req.File)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
hasher := sha256.New()
|
|
||||||
_, err = hasher.Write(base64DecodedFile)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
sha256Sum := hex.EncodeToString(hasher.Sum(nil))
|
|
||||||
|
|
||||||
exists, err := c.storage.Exists(sha256Sum)
|
|
||||||
if err != nil {
|
|
||||||
if !strings.Contains(err.Error(), "NotFound") {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if exists {
|
|
||||||
return &peridotpb.LookasideFileUploadTask{
|
|
||||||
Digest: sha256Sum,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = c.storage.PutObjectBytes(sha256Sum, base64DecodedFile)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &peridotpb.LookasideFileUploadTask{
|
|
||||||
Digest: sha256Sum,
|
|
||||||
}, nil
|
|
||||||
}
|
|
@ -18,7 +18,6 @@ go_library(
|
|||||||
importpath = "peridot.resf.org/peridot/cmd/v1/peridot",
|
importpath = "peridot.resf.org/peridot/cmd/v1/peridot",
|
||||||
visibility = ["//visibility:private"],
|
visibility = ["//visibility:private"],
|
||||||
deps = [
|
deps = [
|
||||||
"//utils",
|
|
||||||
"//vendor/github.com/sirupsen/logrus",
|
"//vendor/github.com/sirupsen/logrus",
|
||||||
"//vendor/github.com/spf13/cobra",
|
"//vendor/github.com/spf13/cobra",
|
||||||
"//vendor/github.com/spf13/viper",
|
"//vendor/github.com/spf13/viper",
|
||||||
|
@ -32,14 +32,11 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"openapi.peridot.resf.org/peridotopenapi"
|
"openapi.peridot.resf.org/peridotopenapi"
|
||||||
"os"
|
"os"
|
||||||
"peridot.resf.org/utils"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -86,7 +83,7 @@ func buildRpmImportMn(_ *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Upload blobs to lookaside and wait for operation to finish
|
// Upload blobs to lookaside and wait for operation to finish
|
||||||
var operations []string
|
var blobs []string
|
||||||
projectCl := getClient(serviceProject).(peridotopenapi.ProjectServiceApi)
|
projectCl := getClient(serviceProject).(peridotopenapi.ProjectServiceApi)
|
||||||
for _, arg := range args {
|
for _, arg := range args {
|
||||||
bts, err := ioutil.ReadFile(arg)
|
bts, err := ioutil.ReadFile(arg)
|
||||||
@ -97,56 +94,12 @@ func buildRpmImportMn(_ *cobra.Command, args []string) {
|
|||||||
File: &base64EncodedBytes,
|
File: &base64EncodedBytes,
|
||||||
}).Execute()
|
}).Execute()
|
||||||
errFatal(err)
|
errFatal(err)
|
||||||
log.Printf("Uploading %s to lookaside with task id %s\n", arg, res.GetTaskId())
|
log.Printf("Uploaded %s to lookaside", arg)
|
||||||
operations = append(operations, res.GetTaskId())
|
blobs = append(blobs, res.GetDigest())
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("Waiting for upload tasks to finish...")
|
|
||||||
|
|
||||||
// Wait for tasks to reach success state
|
|
||||||
taskCl := getClient(serviceTask).(peridotopenapi.TaskServiceApi)
|
taskCl := getClient(serviceTask).(peridotopenapi.TaskServiceApi)
|
||||||
var doneOperations []string
|
|
||||||
var blobs []string
|
|
||||||
for {
|
|
||||||
didBreak := false
|
|
||||||
for _, op := range operations {
|
|
||||||
log.Printf("Waiting for %s to finish\n", op)
|
|
||||||
if len(doneOperations) == len(operations) {
|
|
||||||
didBreak = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if utils.StrContains(op, doneOperations) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
res, resp, err := taskCl.GetTask(getContext(), "global", op).Execute()
|
|
||||||
errFatal(err)
|
|
||||||
task := res.GetTask()
|
|
||||||
if task.GetDone() {
|
|
||||||
subtask := task.GetSubtasks()[0]
|
|
||||||
if subtask.GetStatus() == peridotopenapi.SUCCEEDED {
|
|
||||||
b, err := ioutil.ReadAll(resp.Body)
|
|
||||||
errFatal(err)
|
|
||||||
|
|
||||||
var subtaskFull LookasideUploadTask
|
|
||||||
errFatal(json.Unmarshal(b, &subtaskFull))
|
|
||||||
|
|
||||||
blobs = append(blobs, subtaskFull.Task.Subtasks[0].Response.Digest)
|
|
||||||
doneOperations = append(doneOperations, op)
|
|
||||||
log.Printf("Task %s finished successfully\n", op)
|
|
||||||
} else if subtask.GetStatus() != peridotopenapi.RUNNING || subtask.GetStatus() != peridotopenapi.PENDING {
|
|
||||||
errFatal(fmt.Errorf("subtask %s failed with status %s", op, subtask.GetStatus()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
}
|
|
||||||
if didBreak {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println("Upload tasks finished")
|
|
||||||
log.Println("Triggering RPM batch import")
|
log.Println("Triggering RPM batch import")
|
||||||
|
|
||||||
cl := getClient(serviceBuild).(peridotopenapi.BuildServiceApi)
|
cl := getClient(serviceBuild).(peridotopenapi.BuildServiceApi)
|
||||||
@ -156,4 +109,23 @@ func buildRpmImportMn(_ *cobra.Command, args []string) {
|
|||||||
ForceOverride: &buildRpmImportForceOverride,
|
ForceOverride: &buildRpmImportForceOverride,
|
||||||
}).Execute()
|
}).Execute()
|
||||||
errFatal(err)
|
errFatal(err)
|
||||||
|
|
||||||
|
// Wait for import to finish
|
||||||
|
log.Printf("Waiting for import %s to finish\n", importRes.GetTaskId())
|
||||||
|
for {
|
||||||
|
res, _, err := taskCl.GetTask(getContext(), projectId, importRes.GetTaskId()).Execute()
|
||||||
|
errFatal(err)
|
||||||
|
task := res.GetTask()
|
||||||
|
if task.GetDone() {
|
||||||
|
if task.GetSubtasks()[0].GetStatus() == peridotopenapi.SUCCEEDED {
|
||||||
|
log.Printf("Import %s finished successfully\n", importRes.GetTaskId())
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
log.Printf("Import %s failed with status %s\n", importRes.GetTaskId(), task.GetSubtasks()[0].GetStatus())
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
"openapi.peridot.resf.org/peridotopenapi"
|
"openapi.peridot.resf.org/peridotopenapi"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
@ -58,8 +59,10 @@ func lookasideUploadMn(_ *cobra.Command, args []string) {
|
|||||||
base64EncodedBytes := base64.StdEncoding.EncodeToString(bts)
|
base64EncodedBytes := base64.StdEncoding.EncodeToString(bts)
|
||||||
|
|
||||||
cl := getClient(serviceProject).(peridotopenapi.ProjectServiceApi)
|
cl := getClient(serviceProject).(peridotopenapi.ProjectServiceApi)
|
||||||
_, _, err = cl.LookasideFileUpload(getContext()).Body(peridotopenapi.V1LookasideFileUploadRequest{
|
res, _, err := cl.LookasideFileUpload(getContext()).Body(peridotopenapi.V1LookasideFileUploadRequest{
|
||||||
File: &base64EncodedBytes,
|
File: &base64EncodedBytes,
|
||||||
}).Execute()
|
}).Execute()
|
||||||
errFatal(err)
|
errFatal(err)
|
||||||
|
|
||||||
|
log.Printf("Digest: %s", res.GetDigest())
|
||||||
}
|
}
|
||||||
|
@ -158,9 +158,6 @@ func mn(_ *cobra.Command, _ []string) {
|
|||||||
// RPM Import
|
// RPM Import
|
||||||
w.Worker.RegisterActivity(w.WorkflowController.RpmImportActivity)
|
w.Worker.RegisterActivity(w.WorkflowController.RpmImportActivity)
|
||||||
|
|
||||||
// Lookaside
|
|
||||||
w.Worker.RegisterActivity(w.WorkflowController.LookasideFileUploadActivity)
|
|
||||||
|
|
||||||
// Yumrepofs
|
// Yumrepofs
|
||||||
w.Worker.RegisterActivity(w.WorkflowController.CreateHashedRepositoriesActivity)
|
w.Worker.RegisterActivity(w.WorkflowController.CreateHashedRepositoriesActivity)
|
||||||
|
|
||||||
|
@ -106,7 +106,6 @@ func mn(_ *cobra.Command, _ []string) {
|
|||||||
w.Worker.RegisterWorkflow(w.WorkflowController.RpmImportWorkflow)
|
w.Worker.RegisterWorkflow(w.WorkflowController.RpmImportWorkflow)
|
||||||
w.Worker.RegisterWorkflow(w.WorkflowController.RpmLookasideBatchImportWorkflow)
|
w.Worker.RegisterWorkflow(w.WorkflowController.RpmLookasideBatchImportWorkflow)
|
||||||
w.Worker.RegisterWorkflow(w.WorkflowController.CreateHashedRepositoriesWorkflow)
|
w.Worker.RegisterWorkflow(w.WorkflowController.CreateHashedRepositoriesWorkflow)
|
||||||
w.Worker.RegisterWorkflow(w.WorkflowController.LookasideFileUploadWorkflow)
|
|
||||||
}
|
}
|
||||||
w.Worker.RegisterWorkflow(w.WorkflowController.ProvisionWorkerWorkflow)
|
w.Worker.RegisterWorkflow(w.WorkflowController.ProvisionWorkerWorkflow)
|
||||||
w.Worker.RegisterWorkflow(w.WorkflowController.DestroyWorkerWorkflow)
|
w.Worker.RegisterWorkflow(w.WorkflowController.DestroyWorkerWorkflow)
|
||||||
|
@ -6,10 +6,13 @@ go_library(
|
|||||||
importpath = "peridot.resf.org/peridot/cmd/v1/peridotserver",
|
importpath = "peridot.resf.org/peridot/cmd/v1/peridotserver",
|
||||||
visibility = ["//visibility:private"],
|
visibility = ["//visibility:private"],
|
||||||
deps = [
|
deps = [
|
||||||
|
"//peridot/common",
|
||||||
"//peridot/db/connector",
|
"//peridot/db/connector",
|
||||||
"//peridot/impl/v1:impl",
|
"//peridot/impl/v1:impl",
|
||||||
|
"//peridot/lookaside/s3",
|
||||||
"//temporalutils",
|
"//temporalutils",
|
||||||
"//utils",
|
"//utils",
|
||||||
|
"//vendor/github.com/go-git/go-billy/v5/osfs",
|
||||||
"//vendor/github.com/sirupsen/logrus",
|
"//vendor/github.com/sirupsen/logrus",
|
||||||
"//vendor/github.com/spf13/cobra",
|
"//vendor/github.com/spf13/cobra",
|
||||||
"//vendor/go.temporal.io/sdk/client",
|
"//vendor/go.temporal.io/sdk/client",
|
||||||
|
@ -31,11 +31,14 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/go-git/go-billy/v5/osfs"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"go.temporal.io/sdk/client"
|
"go.temporal.io/sdk/client"
|
||||||
|
peridotcommon "peridot.resf.org/peridot/common"
|
||||||
serverconnector "peridot.resf.org/peridot/db/connector"
|
serverconnector "peridot.resf.org/peridot/db/connector"
|
||||||
peridotimplv1 "peridot.resf.org/peridot/impl/v1"
|
peridotimplv1 "peridot.resf.org/peridot/impl/v1"
|
||||||
|
"peridot.resf.org/peridot/lookaside/s3"
|
||||||
"peridot.resf.org/temporalutils"
|
"peridot.resf.org/temporalutils"
|
||||||
"peridot.resf.org/utils"
|
"peridot.resf.org/utils"
|
||||||
)
|
)
|
||||||
@ -54,6 +57,7 @@ func init() {
|
|||||||
cnf.DatabaseName = &dname
|
cnf.DatabaseName = &dname
|
||||||
cnf.Name = "peridot"
|
cnf.Name = "peridot"
|
||||||
|
|
||||||
|
peridotcommon.AddFlags(root.PersistentFlags())
|
||||||
utils.AddFlags(root.PersistentFlags(), cnf)
|
utils.AddFlags(root.PersistentFlags(), cnf)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,7 +68,12 @@ func mn(_ *cobra.Command, _ []string) {
|
|||||||
}
|
}
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
|
||||||
s, err := peridotimplv1.NewServer(serverconnector.MustAuto(), c)
|
storage, err := s3.New(osfs.New("/"))
|
||||||
|
if err != nil {
|
||||||
|
logrus.Fatalln("unable to create S3 storage", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := peridotimplv1.NewServer(serverconnector.MustAuto(), c, storage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Fatalf("could not init server: %v", err)
|
logrus.Fatalf("could not init server: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ go_library(
|
|||||||
"//peridot/builder/v1/workflow",
|
"//peridot/builder/v1/workflow",
|
||||||
"//peridot/db",
|
"//peridot/db",
|
||||||
"//peridot/db/models",
|
"//peridot/db/models",
|
||||||
|
"//peridot/lookaside",
|
||||||
"//peridot/proto/v1:pb",
|
"//peridot/proto/v1:pb",
|
||||||
"//proto:common",
|
"//proto:common",
|
||||||
"//servicecatalog",
|
"//servicecatalog",
|
||||||
|
@ -32,7 +32,10 @@ package peridotimplv1
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/hex"
|
||||||
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
|
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
|
||||||
"go.temporal.io/sdk/client"
|
"go.temporal.io/sdk/client"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
@ -404,7 +407,7 @@ func (s *Server) CreateHashedRepositories(ctx context.Context, req *peridotpb.Cr
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) LookasideFileUpload(ctx context.Context, req *peridotpb.LookasideFileUploadRequest) (*peridotpb.AsyncTask, error) {
|
func (s *Server) LookasideFileUpload(ctx context.Context, req *peridotpb.LookasideFileUploadRequest) (*peridotpb.LookasideFileUploadResponse, error) {
|
||||||
if err := req.Validate(); err != nil {
|
if err := req.Validate(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -412,60 +415,36 @@ func (s *Server) LookasideFileUpload(ctx context.Context, req *peridotpb.Lookasi
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
user, err := utils.UserFromContext(ctx)
|
base64DecodedFile, err := base64.StdEncoding.DecodeString(req.File)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rollback := true
|
hasher := sha256.New()
|
||||||
beginTx, err := s.db.Begin()
|
_, err = hasher.Write(base64DecodedFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error(err)
|
return nil, err
|
||||||
return nil, utils.InternalError
|
|
||||||
}
|
}
|
||||||
defer func() {
|
sha256Sum := hex.EncodeToString(hasher.Sum(nil))
|
||||||
if rollback {
|
|
||||||
_ = beginTx.Rollback()
|
exists, err := s.storage.Exists(sha256Sum)
|
||||||
|
if err != nil {
|
||||||
|
if !strings.Contains(err.Error(), "NotFound") {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
tx := s.db.UseTransaction(beginTx)
|
if exists {
|
||||||
|
return &peridotpb.LookasideFileUploadResponse{
|
||||||
task, err := tx.CreateTask(user, "noarch", peridotpb.TaskType_TASK_TYPE_LOOKASIDE_FILE_UPLOAD, nil, nil)
|
Digest: sha256Sum,
|
||||||
if err != nil {
|
}, nil
|
||||||
s.log.Errorf("could not create task: %v", err)
|
|
||||||
return nil, utils.InternalError
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taskProto, err := task.ToProto(false)
|
_, err = s.storage.PutObjectBytes(sha256Sum, base64DecodedFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, status.Errorf(codes.Internal, "could not marshal task: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rollback = false
|
|
||||||
err = beginTx.Commit()
|
|
||||||
if err != nil {
|
|
||||||
return nil, status.Error(codes.Internal, "could not save, try again")
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = s.temporal.ExecuteWorkflow(
|
|
||||||
context.Background(),
|
|
||||||
client.StartWorkflowOptions{
|
|
||||||
ID: task.ID.String(),
|
|
||||||
TaskQueue: MainTaskQueue,
|
|
||||||
},
|
|
||||||
s.temporalWorker.WorkflowController.LookasideFileUploadWorkflow,
|
|
||||||
req,
|
|
||||||
task,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
s.log.Errorf("could not start workflow: %v", err)
|
|
||||||
_ = s.db.SetTaskStatus(task.ID.String(), peridotpb.TaskStatus_TASK_STATUS_FAILED)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &peridotpb.AsyncTask{
|
return &peridotpb.LookasideFileUploadResponse{
|
||||||
TaskId: task.ID.String(),
|
Digest: sha256Sum,
|
||||||
Subtasks: []*peridotpb.Subtask{taskProto},
|
|
||||||
Done: false,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,7 @@ import (
|
|||||||
commonpb "peridot.resf.org/common"
|
commonpb "peridot.resf.org/common"
|
||||||
builderv1 "peridot.resf.org/peridot/builder/v1"
|
builderv1 "peridot.resf.org/peridot/builder/v1"
|
||||||
peridotdb "peridot.resf.org/peridot/db"
|
peridotdb "peridot.resf.org/peridot/db"
|
||||||
|
"peridot.resf.org/peridot/lookaside"
|
||||||
peridotpb "peridot.resf.org/peridot/pb"
|
peridotpb "peridot.resf.org/peridot/pb"
|
||||||
"peridot.resf.org/servicecatalog"
|
"peridot.resf.org/servicecatalog"
|
||||||
"peridot.resf.org/utils"
|
"peridot.resf.org/utils"
|
||||||
@ -89,9 +90,10 @@ type Server struct {
|
|||||||
authz *authzed.Client
|
authz *authzed.Client
|
||||||
hydra *hydraclient.OryHydra
|
hydra *hydraclient.OryHydra
|
||||||
hydraAdmin *hydraclient.OryHydra
|
hydraAdmin *hydraclient.OryHydra
|
||||||
|
storage lookaside.Storage
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(db peridotdb.Access, c client.Client) (*Server, error) {
|
func NewServer(db peridotdb.Access, c client.Client, storage lookaside.Storage) (*Server, error) {
|
||||||
temporalWorker, err := builderv1.NewWorker(db, c, MainTaskQueue, nil)
|
temporalWorker, err := builderv1.NewWorker(db, c, MainTaskQueue, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -130,6 +132,7 @@ func NewServer(db peridotdb.Access, c client.Client) (*Server, error) {
|
|||||||
authz: authz,
|
authz: authz,
|
||||||
hydra: hydraSDK,
|
hydra: hydraSDK,
|
||||||
hydraAdmin: hydraAdminSDK,
|
hydraAdmin: hydraAdminSDK,
|
||||||
|
storage: storage,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -152,11 +155,11 @@ func (s *Server) Run() {
|
|||||||
&utils.GRPCOptions{
|
&utils.GRPCOptions{
|
||||||
DialOptions: []grpc.DialOption{
|
DialOptions: []grpc.DialOption{
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024 * 1024 * 1024)),
|
|
||||||
},
|
},
|
||||||
ServerOptions: []grpc.ServerOption{
|
ServerOptions: []grpc.ServerOption{
|
||||||
grpc.UnaryInterceptor(s.interceptor),
|
grpc.UnaryInterceptor(s.interceptor),
|
||||||
grpc.StreamInterceptor(s.serverInterceptor),
|
grpc.StreamInterceptor(s.serverInterceptor),
|
||||||
|
grpc.MaxRecvMsgSize(1024 * 1024 * 1024),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
func(r *utils.Register) {
|
func(r *utils.Register) {
|
||||||
|
@ -76,7 +76,7 @@ service ProjectService {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
rpc LookasideFileUpload(LookasideFileUploadRequest) returns (resf.peridot.v1.AsyncTask) {
|
rpc LookasideFileUpload(LookasideFileUploadRequest) returns (LookasideFileUploadResponse) {
|
||||||
option (google.api.http) = {
|
option (google.api.http) = {
|
||||||
post: "/v1/lookaside"
|
post: "/v1/lookaside"
|
||||||
body: "*"
|
body: "*"
|
||||||
@ -302,6 +302,6 @@ message CreateHashedRepositoriesTask {
|
|||||||
message LookasideFileUploadRequest {
|
message LookasideFileUploadRequest {
|
||||||
string file = 1 [(validate.rules).string.min_bytes = 1];
|
string file = 1 [(validate.rules).string.min_bytes = 1];
|
||||||
}
|
}
|
||||||
message LookasideFileUploadTask {
|
message LookasideFileUploadResponse {
|
||||||
string digest = 1;
|
string digest = 1;
|
||||||
}
|
}
|
||||||
|
@ -153,6 +153,13 @@ func NewGRPCServer(goptions *GRPCOptions, endpoint func(*Register), serve func(*
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
|
||||||
|
// use DialOptions if not nil
|
||||||
|
if options.DialOptions != nil {
|
||||||
|
opts = options.DialOptions
|
||||||
|
}
|
||||||
|
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1000*1024*1024), grpc.MaxCallSendMsgSize(1000*1024*1024)))
|
||||||
|
|
||||||
serv := grpc.NewServer(options.ServerOptions...)
|
serv := grpc.NewServer(options.ServerOptions...)
|
||||||
|
|
||||||
// background context since this is the "main" app
|
// background context since this is the "main" app
|
||||||
@ -178,12 +185,6 @@ func NewGRPCServer(goptions *GRPCOptions, endpoint func(*Register), serve func(*
|
|||||||
}
|
}
|
||||||
|
|
||||||
mux := runtime.NewServeMux(muxOptions...)
|
mux := runtime.NewServeMux(muxOptions...)
|
||||||
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
|
|
||||||
// use DialOptions if not nil
|
|
||||||
if options.DialOptions != nil {
|
|
||||||
opts = options.DialOptions
|
|
||||||
}
|
|
||||||
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1000*1024*1024), grpc.MaxCallSendMsgSize(1000*1024*1024)))
|
|
||||||
|
|
||||||
register := &Register{
|
register := &Register{
|
||||||
Context: ctx,
|
Context: ctx,
|
||||||
|
1
vendor/openapi.peridot.resf.org/peridotopenapi/BUILD.bazel
generated
vendored
1
vendor/openapi.peridot.resf.org/peridotopenapi/BUILD.bazel
generated
vendored
@ -57,6 +57,7 @@ go_library(
|
|||||||
"model_v1_list_repositories_response.go",
|
"model_v1_list_repositories_response.go",
|
||||||
"model_v1_list_tasks_response.go",
|
"model_v1_list_tasks_response.go",
|
||||||
"model_v1_lookaside_file_upload_request.go",
|
"model_v1_lookaside_file_upload_request.go",
|
||||||
|
"model_v1_lookaside_file_upload_response.go",
|
||||||
"model_v1_package.go",
|
"model_v1_package.go",
|
||||||
"model_v1_package_filters.go",
|
"model_v1_package_filters.go",
|
||||||
"model_v1_package_type.go",
|
"model_v1_package_type.go",
|
||||||
|
1
vendor/openapi.peridot.resf.org/peridotopenapi/README.md
generated
vendored
1
vendor/openapi.peridot.resf.org/peridotopenapi/README.md
generated
vendored
@ -161,6 +161,7 @@ Class | Method | HTTP request | Description
|
|||||||
- [V1ListRepositoriesResponse](docs/V1ListRepositoriesResponse.md)
|
- [V1ListRepositoriesResponse](docs/V1ListRepositoriesResponse.md)
|
||||||
- [V1ListTasksResponse](docs/V1ListTasksResponse.md)
|
- [V1ListTasksResponse](docs/V1ListTasksResponse.md)
|
||||||
- [V1LookasideFileUploadRequest](docs/V1LookasideFileUploadRequest.md)
|
- [V1LookasideFileUploadRequest](docs/V1LookasideFileUploadRequest.md)
|
||||||
|
- [V1LookasideFileUploadResponse](docs/V1LookasideFileUploadResponse.md)
|
||||||
- [V1Package](docs/V1Package.md)
|
- [V1Package](docs/V1Package.md)
|
||||||
- [V1PackageFilters](docs/V1PackageFilters.md)
|
- [V1PackageFilters](docs/V1PackageFilters.md)
|
||||||
- [V1PackageType](docs/V1PackageType.md)
|
- [V1PackageType](docs/V1PackageType.md)
|
||||||
|
12
vendor/openapi.peridot.resf.org/peridotopenapi/api_project_service.go
generated
vendored
12
vendor/openapi.peridot.resf.org/peridotopenapi/api_project_service.go
generated
vendored
@ -132,9 +132,9 @@ type ProjectServiceApi interface {
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* LookasideFileUploadExecute executes the request
|
* LookasideFileUploadExecute executes the request
|
||||||
* @return V1AsyncTask
|
* @return V1LookasideFileUploadResponse
|
||||||
*/
|
*/
|
||||||
LookasideFileUploadExecute(r ApiLookasideFileUploadRequest) (V1AsyncTask, *_nethttp.Response, error)
|
LookasideFileUploadExecute(r ApiLookasideFileUploadRequest) (V1LookasideFileUploadResponse, *_nethttp.Response, error)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SetProjectCredentials Method for SetProjectCredentials
|
* SetProjectCredentials Method for SetProjectCredentials
|
||||||
@ -1009,7 +1009,7 @@ func (r ApiLookasideFileUploadRequest) Body(body V1LookasideFileUploadRequest) A
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r ApiLookasideFileUploadRequest) Execute() (V1AsyncTask, *_nethttp.Response, error) {
|
func (r ApiLookasideFileUploadRequest) Execute() (V1LookasideFileUploadResponse, *_nethttp.Response, error) {
|
||||||
return r.ApiService.LookasideFileUploadExecute(r)
|
return r.ApiService.LookasideFileUploadExecute(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1027,16 +1027,16 @@ func (a *ProjectServiceApiService) LookasideFileUpload(ctx _context.Context) Api
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Execute executes the request
|
* Execute executes the request
|
||||||
* @return V1AsyncTask
|
* @return V1LookasideFileUploadResponse
|
||||||
*/
|
*/
|
||||||
func (a *ProjectServiceApiService) LookasideFileUploadExecute(r ApiLookasideFileUploadRequest) (V1AsyncTask, *_nethttp.Response, error) {
|
func (a *ProjectServiceApiService) LookasideFileUploadExecute(r ApiLookasideFileUploadRequest) (V1LookasideFileUploadResponse, *_nethttp.Response, error) {
|
||||||
var (
|
var (
|
||||||
localVarHTTPMethod = _nethttp.MethodPost
|
localVarHTTPMethod = _nethttp.MethodPost
|
||||||
localVarPostBody interface{}
|
localVarPostBody interface{}
|
||||||
localVarFormFileName string
|
localVarFormFileName string
|
||||||
localVarFileName string
|
localVarFileName string
|
||||||
localVarFileBytes []byte
|
localVarFileBytes []byte
|
||||||
localVarReturnValue V1AsyncTask
|
localVarReturnValue V1LookasideFileUploadResponse
|
||||||
)
|
)
|
||||||
|
|
||||||
localBasePath, err := a.client.cfg.ServerURLWithContext(r.ctx, "ProjectServiceApiService.LookasideFileUpload")
|
localBasePath, err := a.client.cfg.ServerURLWithContext(r.ctx, "ProjectServiceApiService.LookasideFileUpload")
|
||||||
|
115
vendor/openapi.peridot.resf.org/peridotopenapi/model_v1_lookaside_file_upload_response.go
generated
vendored
Normal file
115
vendor/openapi.peridot.resf.org/peridotopenapi/model_v1_lookaside_file_upload_response.go
generated
vendored
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
/*
|
||||||
|
* peridot/proto/v1/batch.proto
|
||||||
|
*
|
||||||
|
* No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
|
||||||
|
*
|
||||||
|
* API version: version not set
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT.
|
||||||
|
|
||||||
|
package peridotopenapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
// V1LookasideFileUploadResponse struct for V1LookasideFileUploadResponse
|
||||||
|
type V1LookasideFileUploadResponse struct {
|
||||||
|
Digest *string `json:"digest,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewV1LookasideFileUploadResponse instantiates a new V1LookasideFileUploadResponse object
|
||||||
|
// This constructor will assign default values to properties that have it defined,
|
||||||
|
// and makes sure properties required by API are set, but the set of arguments
|
||||||
|
// will change when the set of required properties is changed
|
||||||
|
func NewV1LookasideFileUploadResponse() *V1LookasideFileUploadResponse {
|
||||||
|
this := V1LookasideFileUploadResponse{}
|
||||||
|
return &this
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewV1LookasideFileUploadResponseWithDefaults instantiates a new V1LookasideFileUploadResponse object
|
||||||
|
// This constructor will only assign default values to properties that have it defined,
|
||||||
|
// but it doesn't guarantee that properties required by API are set
|
||||||
|
func NewV1LookasideFileUploadResponseWithDefaults() *V1LookasideFileUploadResponse {
|
||||||
|
this := V1LookasideFileUploadResponse{}
|
||||||
|
return &this
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDigest returns the Digest field value if set, zero value otherwise.
|
||||||
|
func (o *V1LookasideFileUploadResponse) GetDigest() string {
|
||||||
|
if o == nil || o.Digest == nil {
|
||||||
|
var ret string
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
return *o.Digest
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDigestOk returns a tuple with the Digest field value if set, nil otherwise
|
||||||
|
// and a boolean to check if the value has been set.
|
||||||
|
func (o *V1LookasideFileUploadResponse) GetDigestOk() (*string, bool) {
|
||||||
|
if o == nil || o.Digest == nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return o.Digest, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasDigest returns a boolean if a field has been set.
|
||||||
|
func (o *V1LookasideFileUploadResponse) HasDigest() bool {
|
||||||
|
if o != nil && o.Digest != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetDigest gets a reference to the given string and assigns it to the Digest field.
|
||||||
|
func (o *V1LookasideFileUploadResponse) SetDigest(v string) {
|
||||||
|
o.Digest = &v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o V1LookasideFileUploadResponse) MarshalJSON() ([]byte, error) {
|
||||||
|
toSerialize := map[string]interface{}{}
|
||||||
|
if o.Digest != nil {
|
||||||
|
toSerialize["digest"] = o.Digest
|
||||||
|
}
|
||||||
|
return json.Marshal(toSerialize)
|
||||||
|
}
|
||||||
|
|
||||||
|
type NullableV1LookasideFileUploadResponse struct {
|
||||||
|
value *V1LookasideFileUploadResponse
|
||||||
|
isSet bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v NullableV1LookasideFileUploadResponse) Get() *V1LookasideFileUploadResponse {
|
||||||
|
return v.value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *NullableV1LookasideFileUploadResponse) Set(val *V1LookasideFileUploadResponse) {
|
||||||
|
v.value = val
|
||||||
|
v.isSet = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v NullableV1LookasideFileUploadResponse) IsSet() bool {
|
||||||
|
return v.isSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *NullableV1LookasideFileUploadResponse) Unset() {
|
||||||
|
v.value = nil
|
||||||
|
v.isSet = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNullableV1LookasideFileUploadResponse(val *V1LookasideFileUploadResponse) *NullableV1LookasideFileUploadResponse {
|
||||||
|
return &NullableV1LookasideFileUploadResponse{value: val, isSet: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v NullableV1LookasideFileUploadResponse) MarshalJSON() ([]byte, error) {
|
||||||
|
return json.Marshal(v.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *NullableV1LookasideFileUploadResponse) UnmarshalJSON(src []byte) error {
|
||||||
|
v.isSet = true
|
||||||
|
return json.Unmarshal(src, &v.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user