Add more info to ImportRPMResponse and Entry

This commit is contained in:
Mustafa Gezen 2023-09-03 07:45:49 +02:00
parent 78914375d9
commit 6540ef8d72
Signed by: mustafa
GPG Key ID: DCDF010D946438C1
7 changed files with 177 additions and 104 deletions

View File

@ -37,7 +37,10 @@ type Entry struct {
UserEmail sql.NullString `db:"user_email"`
CommitURI string `db:"commit_uri"`
CommitHash string `db:"commit_hash"`
CommitBranch string `db:"commit_branch"`
CommitTag string `db:"commit_tag"`
State mothershippb.Entry_State `db:"state"`
PackageName string `db:"package_name"`
}
func (e *Entry) GetID() string {
@ -46,17 +49,20 @@ func (e *Entry) GetID() string {
func (e *Entry) ToPB() *mothershippb.Entry {
return &mothershippb.Entry{
Name: e.Name,
EntryId: e.EntryID,
CreateTime: timestamppb.New(e.CreateTime),
OsRelease: e.OSRelease,
Sha256Sum: e.Sha256Sum,
Repository: e.RepositoryName,
WorkerId: base.SqlNullString(e.WorkerID),
Batch: base.SqlNullString(e.BatchName),
UserEmail: base.SqlNullString(e.UserEmail),
CommitUri: e.CommitURI,
CommitHash: e.CommitHash,
State: e.State,
Name: e.Name,
EntryId: e.EntryID,
CreateTime: timestamppb.New(e.CreateTime),
OsRelease: e.OSRelease,
Sha256Sum: e.Sha256Sum,
Repository: e.RepositoryName,
WorkerId: base.SqlNullString(e.WorkerID),
Batch: base.SqlNullString(e.BatchName),
UserEmail: base.SqlNullString(e.UserEmail),
CommitUri: e.CommitURI,
CommitHash: e.CommitHash,
CommitBranch: e.CommitBranch,
CommitTag: e.CommitTag,
State: e.State,
Pkg: e.PackageName,
}
}

View File

@ -34,7 +34,10 @@ CREATE TABLE entries
user_email TEXT,
commit_uri TEXT NOT NULL,
commit_hash TEXT NOT NULL,
state NUMERIC NOT NULL
commit_branch TEXT NOT NULL,
commit_tag TEXT NOT NULL,
state NUMERIC NOT NULL,
package_name TEXT NOT NULL
);
CREATE TABLE batches

View File

@ -77,6 +77,12 @@ message Entry {
// Commit hash of the resulting import
string commit_hash = 11 [(google.api.field_behavior) = OUTPUT_ONLY];
// Commit branch of the resulting import
string commit_branch = 12 [(google.api.field_behavior) = OUTPUT_ONLY];
// Commit tag of the resulting import
string commit_tag = 13 [(google.api.field_behavior) = OUTPUT_ONLY];
// Valid states of an entry.
enum State {
// Default value. This value is unused.
@ -102,10 +108,18 @@ message Entry {
// Failed to archive the entry.
// This entry CAN'T be rescued.
FAILED = 5;
// Retracted. This entry CAN'T be rescued.
// Another import may have happened, retraction is usually done
// if debranding was not complete but successful.
RETRACTED = 6;
}
// State of the entry.
State state = 12 [(google.api.field_behavior) = OUTPUT_ONLY];
State state = 14 [(google.api.field_behavior) = OUTPUT_ONLY];
// Name of the package being archived.
string pkg = 15 [(google.api.field_behavior) = OUTPUT_ONLY];
// Error message if on hold
string error_message = 13 [(google.api.field_behavior) = OUTPUT_ONLY];
string error_message = 16 [(google.api.field_behavior) = OUTPUT_ONLY];
}

View File

@ -89,7 +89,17 @@ message ImportRPMResponse {
// Commit URI of the imported RPM
string commit_uri = 2 [(google.api.field_behavior) = REQUIRED];
// Commit branch of the imported RPM
string commit_branch = 3 [(google.api.field_behavior) = REQUIRED];
// Commit tag of the imported RPM
string commit_tag = 4 [(google.api.field_behavior) = REQUIRED];
// NEVRA of the imported RPM
// e.g. rpm-1.0.0-1.el8.x86_64
string nevra = 3 [(google.api.field_behavior) = REQUIRED];
string nevra = 5 [(google.api.field_behavior) = REQUIRED];
// Package name of the imported RPM
// e.g. rpm
string pkg = 6 [(google.api.field_behavior) = REQUIRED];
}

View File

@ -15,12 +15,20 @@
package mothership_worker_server
import (
"crypto/sha256"
"database/sql"
"encoding/hex"
"fmt"
"github.com/pkg/errors"
"github.com/sassoftware/go-rpmutils"
base "go.resf.org/peridot/base/go"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"go.temporal.io/sdk/temporal"
"io"
"net/url"
"os"
"path/filepath"
"time"
)
@ -53,6 +61,84 @@ func (w *Worker) CreateEntry(args *mothershippb.ProcessRPMArgs) (*mothershippb.E
return entry.ToPB(), nil
}
// SetEntryIDFromRPM sets the entry ID from the RPM.
// This is a Temporal activity.
func (w *Worker) SetEntryIDFromRPM(entry string, uri string, checksumSha256 string) (*mothershippb.Entry, error) {
ent, err := base.Q[mothership_db.Entry](w.db).F("name", entry).GetOrNil()
if err != nil {
return nil, errors.Wrap(err, "failed to get entry")
}
if ent == nil {
return nil, errors.New("entry does not exist")
}
tempDir, err := os.MkdirTemp("", "mothership-worker-server-import-rpm-*")
if err != nil {
return nil, errors.Wrap(err, "failed to create temporary directory")
}
defer os.RemoveAll(tempDir)
// Parse uri
parsed, err := url.Parse(uri)
if err != nil {
return nil, errors.Wrap(err, "failed to parse resource URI")
}
// Download the resource to the temporary directory
// S3 for example must include bucket, while memory:// does not.
// So memory://test.rpm would be parsed as host=test.rpm, path="".
// While s3://mship/test.rpm would be parsed as host=mship, path=test.rpm.
object := parsed.Path
if object == "" {
object = parsed.Host
}
err = w.storage.Download(object, filepath.Join(tempDir, "resource.rpm"))
if err != nil {
return nil, errors.Wrap(err, "failed to download resource")
}
// Verify checksum
hash := sha256.New()
f, err := os.Open(filepath.Join(tempDir, "resource.rpm"))
if err != nil {
return nil, errors.Wrap(err, "failed to open resource")
}
defer f.Close()
if _, err := io.Copy(hash, f); err != nil {
return nil, errors.Wrap(err, "failed to hash resource")
}
if hex.EncodeToString(hash.Sum(nil)) != checksumSha256 {
return nil, errors.New("checksum does not match")
}
// Read the RPM headers
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return nil, errors.Wrap(err, "failed to seek resource")
}
rpm, err := rpmutils.ReadRpm(f)
if err != nil {
return nil, errors.Wrap(err, "failed to read RPM headers")
}
nevra, err := rpm.Header.GetNEVRA()
if err != nil {
return nil, errors.Wrap(err, "failed to get RPM NEVRA")
}
// Set entry ID
ent.EntryID = fmt.Sprintf("%s-%s-%s.src", nevra.Name, nevra.Version, nevra.Release)
ent.Sha256Sum = checksumSha256
// Update entry
if err := base.Q[mothership_db.Entry](w.db).U(ent); err != nil {
return nil, errors.Wrap(err, "failed to update entry")
}
return ent.ToPB(), nil
}
func (w *Worker) SetEntryState(entry string, state mothershippb.Entry_State, importRpmRes *mothershippb.ImportRPMResponse) (*mothershippb.Entry, error) {
ent, err := base.Q[mothership_db.Entry](w.db).F("name", entry).GetOrNil()
if err != nil {
@ -70,6 +156,9 @@ func (w *Worker) SetEntryState(entry string, state mothershippb.Entry_State, imp
if importRpmRes != nil {
ent.CommitURI = importRpmRes.CommitUri
ent.CommitHash = importRpmRes.CommitHash
ent.CommitBranch = importRpmRes.CommitBranch
ent.CommitTag = importRpmRes.CommitTag
ent.PackageName = importRpmRes.Pkg
}
if err := base.Q[mothership_db.Entry](w.db).U(ent); err != nil {

View File

@ -17,14 +17,11 @@ package mothership_worker_server
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/go-git/go-billy/v5/memfs"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/storage/memory"
"github.com/pkg/errors"
"github.com/sassoftware/go-rpmutils"
base "go.resf.org/peridot/base/go"
mothership_db "go.resf.org/peridot/tools/mothership/db"
mothershippb "go.resf.org/peridot/tools/mothership/pb"
"go.resf.org/peridot/tools/mothership/worker_server/srpm_import"
"go.temporal.io/sdk/temporal"
@ -63,16 +60,14 @@ func (w *Worker) VerifyResourceExists(uri string) error {
)
}
split := strings.SplitN(parsed.Path, "/", 2)
if len(split) < 2 {
return temporal.NewNonRetryableApplicationError(
"invalid resource URI",
"invalidResourceURI",
errors.New("client submitted an invalid resource URI"),
)
// S3 for example must include bucket, while memory:// does not.
// So memory://test.rpm would be parsed as host=test.rpm, path="".
// While s3://mship/test.rpm would be parsed as host=mship, path=test.rpm.
object := parsed.Path
if object == "" {
object = parsed.Host
}
object := split[1]
exists, err := w.storage.Exists(object)
if err != nil {
return errors.Wrap(err, "failed to check if resource exists")
@ -89,76 +84,6 @@ func (w *Worker) VerifyResourceExists(uri string) error {
return nil
}
// SetEntryIDFromRPM sets the entry ID from the RPM.
// This is a Temporal activity.
func (w *Worker) SetEntryIDFromRPM(entry string, uri string, checksumSha256 string) error {
ent, err := base.Q[mothership_db.Entry](w.db).F("name", entry).GetOrNil()
if err != nil {
return errors.Wrap(err, "failed to get entry")
}
if ent == nil {
return errors.New("entry does not exist")
}
tempDir, err := os.MkdirTemp("", "mothership-worker-server-import-rpm-*")
if err != nil {
return errors.Wrap(err, "failed to create temporary directory")
}
defer os.RemoveAll(tempDir)
// Parse uri
parsed, err := url.Parse(uri)
if err != nil {
return errors.Wrap(err, "failed to parse resource URI")
}
// Download the resource to the temporary directory
err = w.storage.Download(parsed.Path, filepath.Join(tempDir, "resource.rpm"))
if err != nil {
return errors.Wrap(err, "failed to download resource")
}
// Verify checksum
hash := sha256.New()
f, err := os.Open(filepath.Join(tempDir, "resource.rpm"))
if err != nil {
return errors.Wrap(err, "failed to open resource")
}
defer f.Close()
if _, err := io.Copy(hash, f); err != nil {
return errors.Wrap(err, "failed to hash resource")
}
if hex.EncodeToString(hash.Sum(nil)) != checksumSha256 {
return errors.New("checksum does not match")
}
// Read the RPM headers
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return errors.Wrap(err, "failed to seek resource")
}
rpm, err := rpmutils.ReadRpm(f)
if err != nil {
return errors.Wrap(err, "failed to read RPM headers")
}
nevra, err := rpm.Header.GetNEVRA()
if err != nil {
return errors.Wrap(err, "failed to get RPM NEVRA")
}
// Set entry ID
ent.EntryID = fmt.Sprintf("%s-%s-%s.src", nevra.Name, nevra.Version, nevra.Release)
ent.Sha256Sum = checksumSha256
// Update entry
if err := base.Q[mothership_db.Entry](w.db).U(ent); err != nil {
return errors.Wrap(err, "failed to update entry")
}
return nil
}
// ImportRPM imports an RPM into the database.
// This is a Temporal activity.
func (w *Worker) ImportRPM(uri string, checksumSha256 string, osRelease string) (*mothershippb.ImportRPMResponse, error) {
@ -235,6 +160,7 @@ func (w *Worker) ImportRPM(uri string, checksumSha256 string, osRelease string)
}
return nil, errors.Wrap(err, "failed to import SRPM")
}
defer srpmState.Close()
srpmState.SetAuthor(authenticator.AuthorName, authenticator.AuthorEmail)
cloneOpts := &git.CloneOptions{
@ -243,16 +169,19 @@ func (w *Worker) ImportRPM(uri string, checksumSha256 string, osRelease string)
}
storer := memory.NewStorage()
fs := memfs.New()
commit, err := srpmState.Import(cloneOpts, storer, fs, w.storage, osRelease)
importOut, err := srpmState.Import(cloneOpts, storer, fs, w.storage, osRelease)
if err != nil {
return nil, errors.Wrap(err, "failed to import SRPM")
}
commitURI := w.forge.GetCommitViewerURL(repoName, commit.Hash.String())
commitURI := w.forge.GetCommitViewerURL(repoName, importOut.Commit.Hash.String())
return &mothershippb.ImportRPMResponse{
CommitHash: commit.Hash.String(),
CommitUri: commitURI,
Nevra: nevra.String(),
CommitHash: importOut.Commit.Hash.String(),
CommitUri: commitURI,
CommitBranch: importOut.Branch,
CommitTag: importOut.Tag,
Nevra: nevra.String(),
Pkg: nevra.Name,
}, nil
}

View File

@ -69,6 +69,17 @@ type State struct {
rolling bool
}
type ImportOutput struct {
// Commit is the commit object
Commit *object.Commit
// Branch is the branch name
Branch string
// Tag is the tag name
Tag string
}
// copyFromOS copies specified file from OS filesystem to target filesystem.
func copyFromOS(targetFS billy.Filesystem, path string, targetPath string) error {
// Open file from OS filesystem.
@ -680,7 +691,12 @@ func (s *State) patchTargetRepo(repo *git.Repository, lookaside storage.Storage)
}
// Import imports the SRPM into the target repository.
func (s *State) Import(opts *git.CloneOptions, storer storage2.Storer, targetFS billy.Filesystem, lookaside storage.Storage, osRelease string) (*object.Commit, error) {
func (s *State) Import(opts *git.CloneOptions, storer storage2.Storer, targetFS billy.Filesystem, lookaside storage.Storage, osRelease string) (*ImportOutput, error) {
nevra, err := s.rpm.Header.GetNEVRA()
if err != nil {
return nil, errors.Wrap(err, "failed to get NEVRA")
}
// Get the target repository.
repo, branch, err := s.getRepo(opts, storer, targetFS, osRelease)
if err != nil {
@ -718,5 +734,11 @@ func (s *State) Import(opts *git.CloneOptions, storer storage2.Storer, targetFS
return nil, errors.Wrap(err, "failed to get commit object")
}
return commit, nil
tag := fmt.Sprintf("imports/%s/%s-%s-%s", branch, nevra.Name, nevra.Version, nevra.Release)
return &ImportOutput{
Commit: commit,
Branch: branch,
Tag: tag,
}, nil
}