Further work on mship workflows

This commit is contained in:
Mustafa Gezen 2023-08-28 05:12:04 +02:00
parent e009a1641a
commit 26aeb7cc9e
Signed by: mustafa
GPG Key ID: DCDF010D946438C1
4 changed files with 134 additions and 1 deletions

View File

@ -27,8 +27,15 @@ go_library(
"//base/go",
"//base/go/storage",
"//tools/mothership/proto/v1:pb",
"//tools/mothership/worker_server/forge",
"//tools/mothership/worker_server/srpm_import",
"//vendor/github.com/go-git/go-billy/v5/memfs",
"//vendor/github.com/go-git/go-git/v5:go-git",
"//vendor/github.com/go-git/go-git/v5/storage/memory",
"//vendor/github.com/pkg/errors",
"//vendor/github.com/sassoftware/go-rpmutils",
"//vendor/go.temporal.io/sdk/temporal",
"//vendor/go.temporal.io/sdk/workflow",
"//vendor/golang.org/x/crypto/openpgp",
],
)

View File

@ -15,9 +15,21 @@
package mothership_worker_server
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/go-git/go-billy/v5/memfs"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/storage/memory"
"github.com/pkg/errors"
"github.com/sassoftware/go-rpmutils"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"go.resf.org/peridot/tools/mothership/worker_server/srpm_import"
"go.temporal.io/sdk/temporal"
"io"
"net/url"
"os"
"path/filepath"
"strings"
)
@ -74,3 +86,97 @@ func (w *Worker) VerifyResourceExists(uri string) error {
return nil
}
// ImportRPM imports an RPM into the database.
// This is a Temporal activity.
func (w *Worker) ImportRPM(uri string, checksumSha256 string) (*mothershippb.ImportRPMResponse, error) {
tempDir, err := os.MkdirTemp("", "mothership-worker-server-import-rpm-*")
if err != nil {
return nil, errors.Wrap(err, "failed to create temporary directory")
}
defer os.RemoveAll(tempDir)
// Parse uri
parsed, err := url.Parse(uri)
if err != nil {
return nil, errors.Wrap(err, "failed to parse resource URI")
}
// Download the resource to the temporary directory
err = w.storage.Download(parsed.Path, filepath.Join(tempDir, "resource.rpm"))
if err != nil {
return nil, errors.Wrap(err, "failed to download resource")
}
// Verify checksum
hash := sha256.New()
f, err := os.Open(filepath.Join(tempDir, "resource.rpm"))
if err != nil {
return nil, errors.Wrap(err, "failed to open resource")
}
defer f.Close()
if _, err := io.Copy(hash, f); err != nil {
return nil, errors.Wrap(err, "failed to hash resource")
}
if hex.EncodeToString(hash.Sum(nil)) != checksumSha256 {
return nil, errors.New("checksum does not match")
}
// Read the RPM headers
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return nil, errors.Wrap(err, "failed to seek resource")
}
rpm, err := rpmutils.ReadRpm(f)
if err != nil {
return nil, errors.Wrap(err, "failed to read RPM headers")
}
nevra, err := rpm.Header.GetNEVRA()
if err != nil {
return nil, errors.Wrap(err, "failed to get RPM NEVRA")
}
// Ensure repository exists
repoName := nevra.Name
// First ensure that the repo exists.
authenticator, err := w.forge.GetAuthenticator()
if err != nil {
return nil, errors.Wrap(err, "failed to get forge authenticator")
}
err = w.forge.EnsureRepositoryExists(authenticator, repoName)
if err != nil {
return nil, errors.Wrap(err, "failed to ensure repository exists")
}
// Then do an import
srpmState, err := srpm_import.FromFile(filepath.Join(tempDir, "resource.rpm"), w.gpgKeys...)
if err != nil {
if strings.Contains(err.Error(), "failed to verify RPM") {
return nil, temporal.NewNonRetryableApplicationError(
"failed to verify RPM",
"failedToVerifyRPM",
err,
)
}
return nil, errors.Wrap(err, "failed to import SRPM")
}
srpmState.SetAuthor(authenticator.AuthorName, authenticator.AuthorEmail)
cloneOpts := &git.CloneOptions{
URL: fmt.Sprintf("%s/%s", w.forge.GetRemote(), repoName),
Auth: authenticator.AuthMethod,
}
storer := memory.NewStorage()
fs := memfs.New()
commit, err := srpmState.Import(cloneOpts, storer, fs, w.storage)
if err != nil {
return nil, errors.Wrap(err, "failed to import SRPM")
}
return &mothershippb.ImportRPMResponse{
CommitHash: commit.Hash.String(),
}, nil
}

View File

@ -17,16 +17,22 @@ package mothership_worker_server
import (
base "go.resf.org/peridot/base/go"
"go.resf.org/peridot/base/go/storage"
"go.resf.org/peridot/tools/mothership/worker_server/forge"
"golang.org/x/crypto/openpgp"
)
type Worker struct {
db *base.DB
storage storage.Storage
gpgKeys openpgp.EntityList
forge forge.Forge
}
func New(db *base.DB, storage storage.Storage) *Worker {
func New(db *base.DB, storage storage.Storage, gpgKeys openpgp.EntityList, forge forge.Forge) *Worker {
return &Worker{
db: db,
storage: storage,
gpgKeys: gpgKeys,
forge: forge,
}
}

View File

@ -48,5 +48,19 @@ func ProcessRPMWorkflow(ctx workflow.Context, req *mothershippb.ProcessRPMReques
return nil, err
}
// If resource exists, then we can start the import.
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
// We'll wait up to 5 minutes for the import to finish.
// Most imports are fast, but some packages are very large.
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 0,
},
})
err = workflow.ExecuteActivity(ctx, w.ImportRPM, req.RpmUri, req.Checksum).Get(ctx, nil)
if err != nil {
return nil, err
}
return nil, errors.New("unimplemented")
}