From 8ef874b5ae3fe24f63c7f097a584888c2e141870 Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Tue, 16 Aug 2022 12:29:49 +0200 Subject: [PATCH 1/8] Re-use instead of reloading pre-warmed key Signed-off-by: Mustafa Gezen --- peridot/keykeeper/v1/keywarming.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/peridot/keykeeper/v1/keywarming.go b/peridot/keykeeper/v1/keywarming.go index c000484..120e6c1 100644 --- a/peridot/keykeeper/v1/keywarming.go +++ b/peridot/keykeeper/v1/keywarming.go @@ -110,16 +110,7 @@ func (s *Server) WarmGPGKey(key string, armoredKey string, gpgKey *crypto.Key, d // This means that the key is already loaded // We need to delete and replace it if cachedKey != nil { - cachedKey.Lock() - defer cachedKey.Unlock() - - keyId := gpgKey.GetHexKeyID() - err := s.deleteGpgKey(keyId) - if err != nil { - return nil, err - } - - cachedKey.gpgId = keyId + return cachedKey, nil } err := s.importGpgKey(armoredKey) From 0a712673a7ab869c2d88e328f00f014a8956910e Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Tue, 16 Aug 2022 12:48:53 +0200 Subject: [PATCH 2/8] Fix merge conflict upstream/resf Signed-off-by: Mustafa Gezen --- peridot/keykeeper/v1/keywarming.go | 4 +- peridot/keykeeper/v1/server.go | 64 +++++++++++++++++++++++------- 2 files changed, 52 insertions(+), 16 deletions(-) diff --git a/peridot/keykeeper/v1/keywarming.go b/peridot/keykeeper/v1/keywarming.go index 120e6c1..dee276a 100644 --- a/peridot/keykeeper/v1/keywarming.go +++ b/peridot/keykeeper/v1/keywarming.go @@ -106,9 +106,11 @@ func (s *Server) importRpmKey(publicKey string) error { // WarmGPGKey warms up a specific GPG key // This involves shelling out to GPG to import the key func (s *Server) WarmGPGKey(key string, armoredKey string, gpgKey *crypto.Key, db *models.Key) (*LoadedKey, error) { + s.keyImportLock.ReadLock(key) + defer s.keyImportLock.ReadUnlock(key) + cachedKey := s.keys[key] // This means that the key is already loaded - // We need to delete and replace it if cachedKey != nil { return cachedKey, nil } diff --git a/peridot/keykeeper/v1/server.go b/peridot/keykeeper/v1/server.go index 0642889..c2ba2c6 100644 --- a/peridot/keykeeper/v1/server.go +++ b/peridot/keykeeper/v1/server.go @@ -57,17 +57,45 @@ import ( const TaskQueue = "keykeeper" +type MapStringLock struct { + *sync.RWMutex + m map[string]*sync.Mutex +} + +func (m *MapStringLock) ReadLock(key string) { + m.RLock() + defer m.RUnlock() + if m.m[key] == nil { + m.Lock() + m.m[key] = &sync.Mutex{} + m.Unlock() + } + m.m[key].Lock() +} + +func (m *MapStringLock) ReadUnlock(key string) { + m.RLock() + defer m.RUnlock() + if m.m[key] == nil { + m.Lock() + m.m[key] = &sync.Mutex{} + m.Unlock() + } + m.m[key].Unlock() +} + type Server struct { keykeeperpb.UnimplementedKeykeeperServiceServer - log *logrus.Logger - db peridotdb.Access - storage lookaside.Storage - worker worker.Worker - temporal client.Client - stores map[string]store.Store - keys map[string]*LoadedKey - defaultStore string + log *logrus.Logger + db peridotdb.Access + storage lookaside.Storage + worker worker.Worker + temporal client.Client + stores map[string]store.Store + keys map[string]*LoadedKey + keyImportLock *MapStringLock + defaultStore string } func NewServer(db peridotdb.Access, c client.Client) (*Server, error) { @@ -82,13 +110,19 @@ func NewServer(db peridotdb.Access, c client.Client) (*Server, error) { } return &Server{ - log: logrus.New(), - db: db, - storage: storage, - worker: worker.New(c, TaskQueue, worker.Options{}), - temporal: c, - stores: map[string]store.Store{"awssm": sm}, - keys: map[string]*LoadedKey{}, + log: logrus.New(), + db: db, + storage: storage, + worker: worker.New(c, TaskQueue, worker.Options{ + DeadlockDetectionTimeout: 15 * time.Minute, + }), + temporal: c, + stores: map[string]store.Store{"awssm": sm}, + keys: map[string]*LoadedKey{}, + keyImportLock: &MapStringLock{ + RWMutex: &sync.RWMutex{}, + m: map[string]*sync.Mutex{}, + }, defaultStore: "awssm", }, nil } From 4016569ce162cb9eaafa5f122ce80c9390fb0521 Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Tue, 16 Aug 2022 12:52:26 +0200 Subject: [PATCH 3/8] Buffering download for S3 (bypasses billyfs) Signed-off-by: Mustafa Gezen --- peridot/lookaside/s3/s3.go | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/peridot/lookaside/s3/s3.go b/peridot/lookaside/s3/s3.go index 5baa3d8..6829f3e 100644 --- a/peridot/lookaside/s3/s3.go +++ b/peridot/lookaside/s3/s3.go @@ -41,16 +41,16 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/go-git/go-billy/v5" "github.com/spf13/viper" - "io" "io/ioutil" "os" "peridot.resf.org/peridot/lookaside" ) type Storage struct { - bucket string - uploader *s3manager.Uploader - fs billy.Filesystem + bucket string + uploader *s3manager.Uploader + downloader *s3manager.Downloader + fs billy.Filesystem } func New(fs billy.Filesystem) (*Storage, error) { @@ -81,32 +81,29 @@ func New(fs billy.Filesystem) (*Storage, error) { return nil, err } uploader := s3manager.NewUploader(sess) + downloader := s3manager.NewDownloader(sess) return &Storage{ - bucket: viper.GetString("s3-bucket"), - uploader: uploader, - fs: fs, + bucket: viper.GetString("s3-bucket"), + uploader: uploader, + downloader: downloader, + fs: fs, }, nil } func (s *Storage) DownloadObject(objectName string, path string) error { - obj, err := s.uploader.S3.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(objectName), - }) + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return err } - f, err := s.fs.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return err - } - - _, err = io.Copy(f, obj.Body) - if err != nil { - return err - } + _, err = s.downloader.Download( + f, + &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(objectName), + }, + ) return nil } From 302c68a3832ef40aacbadbdcc22e2a14061c6d9c Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Tue, 16 Aug 2022 14:25:55 +0200 Subject: [PATCH 4/8] Check err and correctly close file for S3 storage Signed-off-by: Mustafa Gezen --- peridot/lookaside/s3/s3.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/peridot/lookaside/s3/s3.go b/peridot/lookaside/s3/s3.go index 6829f3e..b21db5a 100644 --- a/peridot/lookaside/s3/s3.go +++ b/peridot/lookaside/s3/s3.go @@ -96,6 +96,7 @@ func (s *Storage) DownloadObject(objectName string, path string) error { if err != nil { return err } + defer f.Close() _, err = s.downloader.Download( f, @@ -105,7 +106,7 @@ func (s *Storage) DownloadObject(objectName string, path string) error { }, ) - return nil + return err } func (s *Storage) ReadObject(objectName string) ([]byte, error) { From 70c45775cb62c44d962a2546b1132f4f1fd32c42 Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Tue, 16 Aug 2022 14:45:03 +0200 Subject: [PATCH 5/8] Remove verification step from keykeeper Signed-off-by: Mustafa Gezen --- peridot/keykeeper/v1/keywarming.go | 23 ----- peridot/keykeeper/v1/sign.go | 130 +++++++++++------------------ 2 files changed, 50 insertions(+), 103 deletions(-) diff --git a/peridot/keykeeper/v1/keywarming.go b/peridot/keykeeper/v1/keywarming.go index dee276a..f95ce13 100644 --- a/peridot/keykeeper/v1/keywarming.go +++ b/peridot/keykeeper/v1/keywarming.go @@ -85,24 +85,6 @@ func (s *Server) importGpgKey(armoredKey string) error { return err } -func (s *Server) importRpmKey(publicKey string) error { - tmpFile, err := ioutil.TempFile("/tmp", "peridot-key-") - if err != nil { - return err - } - defer os.Remove(tmpFile.Name()) - _, err = tmpFile.Write([]byte(publicKey)) - if err != nil { - return err - } - cmd := gpgCmdEnv(exec.Command("rpm", "--import", tmpFile.Name())) - out, err := logCmdRun(cmd) - if err != nil { - s.log.Errorf("failed to import rpm key: %s", out.String()) - } - return err -} - // WarmGPGKey warms up a specific GPG key // This involves shelling out to GPG to import the key func (s *Server) WarmGPGKey(key string, armoredKey string, gpgKey *crypto.Key, db *models.Key) (*LoadedKey, error) { @@ -120,11 +102,6 @@ func (s *Server) WarmGPGKey(key string, armoredKey string, gpgKey *crypto.Key, d return nil, err } - err = s.importRpmKey(db.PublicKey) - if err != nil { - return nil, err - } - if cachedKey == nil { s.keys[key] = &LoadedKey{ keyUuid: db.ID, diff --git a/peridot/keykeeper/v1/sign.go b/peridot/keykeeper/v1/sign.go index 32dcbac..5c00719 100644 --- a/peridot/keykeeper/v1/sign.go +++ b/peridot/keykeeper/v1/sign.go @@ -188,90 +188,60 @@ func (s *Server) SignArtifactActivity(ctx context.Context, artifactId string, ke switch ext { case ".rpm": - rpmSign := func() (*keykeeperpb.SignedArtifact, error) { - var outBuf bytes.Buffer - opts := []string{ - "--define", "_gpg_name " + keyName, - "--define", "_peridot_keykeeper_key " + key.keyUuid.String(), - "--addsign", localPath, - } - cmd := gpgCmdEnv(exec.Command("rpm", opts...)) - cmd.Stdout = &outBuf - cmd.Stderr = &outBuf - err := cmd.Run() - if err != nil { - s.log.Errorf("failed to sign artifact %s: %v", artifact.Name, err) - statusErr := status.New(codes.Internal, "failed to sign artifact") - statusErr, err2 := statusErr.WithDetails(&errdetails.ErrorInfo{ - Reason: "rpmsign-failed", - Domain: "keykeeper.peridot.resf.org", - Metadata: map[string]string{ - "logs": outBuf.String(), - "err": err.Error(), - }, - }) - if err2 != nil { - s.log.Errorf("failed to add error details to status: %v", err2) - } - return nil, statusErr.Err() - } - _, err = s.storage.PutObject(newObjectKey, localPath) - if err != nil { - s.log.Errorf("failed to upload artifact %s: %v", newObjectKey, err) - return nil, fmt.Errorf("failed to upload artifact %s: %v", newObjectKey, err) - } - - f, err := os.Open(localPath) - if err != nil { - return nil, err - } - - hasher := sha256.New() - _, err = io.Copy(hasher, f) - if err != nil { - return nil, err - } - hash := hex.EncodeToString(hasher.Sum(nil)) - - err = s.db.CreateTaskArtifactSignature(artifact.ID.String(), key.keyUuid.String(), hash) - if err != nil { - s.log.Errorf("failed to create task artifact signature: %v", err) - return nil, fmt.Errorf("failed to create task artifact signature: %v", err) - } - - return &keykeeperpb.SignedArtifact{ - Path: newObjectKey, - HashSha256: hash, - }, nil + var outBuf bytes.Buffer + opts := []string{ + "--define", "_gpg_name " + keyName, + "--define", "_peridot_keykeeper_key " + key.keyUuid.String(), + "--addsign", localPath, } - verifySig := func() error { - opts := []string{ - "--define", "_gpg_name " + keyName, - "--define", "_peridot_keykeeper_key " + key.keyUuid.String(), - "--checksig", localPath, + cmd := gpgCmdEnv(exec.Command("rpm", opts...)) + cmd.Stdout = &outBuf + cmd.Stderr = &outBuf + err := cmd.Run() + if err != nil { + s.log.Errorf("failed to sign artifact %s: %v", artifact.Name, err) + statusErr := status.New(codes.Internal, "failed to sign artifact") + statusErr, err2 := statusErr.WithDetails(&errdetails.ErrorInfo{ + Reason: "rpmsign-failed", + Domain: "keykeeper.peridot.resf.org", + Metadata: map[string]string{ + "logs": outBuf.String(), + "err": err.Error(), + }, + }) + if err2 != nil { + s.log.Errorf("failed to add error details to status: %v", err2) } - cmd := gpgCmdEnv(exec.Command("rpm", opts...)) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err := cmd.Run() - if err != nil { - s.log.Errorf("failed to verify artifact %s: %v", artifact.Name, err) - return fmt.Errorf("failed to verify artifact %s: %v", artifact.Name, err) - } - return nil + return nil, statusErr.Err() } - var tries int - for { - res, _ := rpmSign() - err := verifySig() - if err == nil { - return res, nil - } - if err != nil && tries > 3 { - return nil, err - } - tries++ + _, err = s.storage.PutObject(newObjectKey, localPath) + if err != nil { + s.log.Errorf("failed to upload artifact %s: %v", newObjectKey, err) + return nil, fmt.Errorf("failed to upload artifact %s: %v", newObjectKey, err) } + + f, err := os.Open(localPath) + if err != nil { + return nil, err + } + + hasher := sha256.New() + _, err = io.Copy(hasher, f) + if err != nil { + return nil, err + } + hash := hex.EncodeToString(hasher.Sum(nil)) + + err = s.db.CreateTaskArtifactSignature(artifact.ID.String(), key.keyUuid.String(), hash) + if err != nil { + s.log.Errorf("failed to create task artifact signature: %v", err) + return nil, fmt.Errorf("failed to create task artifact signature: %v", err) + } + + return &keykeeperpb.SignedArtifact{ + Path: newObjectKey, + HashSha256: hash, + }, nil default: s.log.Infof("skipping artifact %s, extension %s not supported", artifact.Name, ext) return nil, ErrUnsupportedExtension From e810946c7096b669195abd9f0ec280168f7385ea Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Tue, 16 Aug 2022 15:45:07 +0200 Subject: [PATCH 6/8] Switch to sync.Map for keywarming Signed-off-by: Mustafa Gezen --- peridot/keykeeper/v1/keywarming.go | 28 ++++++---------- peridot/keykeeper/v1/server.go | 54 ++++++------------------------ 2 files changed, 22 insertions(+), 60 deletions(-) diff --git a/peridot/keykeeper/v1/keywarming.go b/peridot/keykeeper/v1/keywarming.go index f95ce13..5ff67e6 100644 --- a/peridot/keykeeper/v1/keywarming.go +++ b/peridot/keykeeper/v1/keywarming.go @@ -38,8 +38,6 @@ import ( "fmt" "github.com/ProtonMail/gopenpgp/v2/crypto" "github.com/google/uuid" - "io/ioutil" - "os" "os/exec" "peridot.resf.org/peridot/db/models" "peridot.resf.org/utils" @@ -88,13 +86,10 @@ func (s *Server) importGpgKey(armoredKey string) error { // WarmGPGKey warms up a specific GPG key // This involves shelling out to GPG to import the key func (s *Server) WarmGPGKey(key string, armoredKey string, gpgKey *crypto.Key, db *models.Key) (*LoadedKey, error) { - s.keyImportLock.ReadLock(key) - defer s.keyImportLock.ReadUnlock(key) - - cachedKey := s.keys[key] + cachedKeyAny, ok := s.keys.Load(key) // This means that the key is already loaded - if cachedKey != nil { - return cachedKey, nil + if ok { + return cachedKeyAny.(*LoadedKey), nil } err := s.importGpgKey(armoredKey) @@ -102,21 +97,20 @@ func (s *Server) WarmGPGKey(key string, armoredKey string, gpgKey *crypto.Key, d return nil, err } - if cachedKey == nil { - s.keys[key] = &LoadedKey{ - keyUuid: db.ID, - gpgId: gpgKey.GetHexKeyID(), - } + cachedKey := &LoadedKey{ + keyUuid: db.ID, + gpgId: gpgKey.GetHexKeyID(), } + s.keys.Store(key, cachedKey) - return s.keys[key], nil + return cachedKey, nil } // EnsureGPGKey ensures that the key is loaded func (s *Server) EnsureGPGKey(key string) (*LoadedKey, error) { - cachedKey := s.keys[key] - if cachedKey != nil { - return cachedKey, nil + cachedKeyAny, ok := s.keys.Load(key) + if ok { + return cachedKeyAny.(*LoadedKey), nil } // Key not found in cache, fetch from database diff --git a/peridot/keykeeper/v1/server.go b/peridot/keykeeper/v1/server.go index c2ba2c6..9d14225 100644 --- a/peridot/keykeeper/v1/server.go +++ b/peridot/keykeeper/v1/server.go @@ -57,45 +57,17 @@ import ( const TaskQueue = "keykeeper" -type MapStringLock struct { - *sync.RWMutex - m map[string]*sync.Mutex -} - -func (m *MapStringLock) ReadLock(key string) { - m.RLock() - defer m.RUnlock() - if m.m[key] == nil { - m.Lock() - m.m[key] = &sync.Mutex{} - m.Unlock() - } - m.m[key].Lock() -} - -func (m *MapStringLock) ReadUnlock(key string) { - m.RLock() - defer m.RUnlock() - if m.m[key] == nil { - m.Lock() - m.m[key] = &sync.Mutex{} - m.Unlock() - } - m.m[key].Unlock() -} - type Server struct { keykeeperpb.UnimplementedKeykeeperServiceServer - log *logrus.Logger - db peridotdb.Access - storage lookaside.Storage - worker worker.Worker - temporal client.Client - stores map[string]store.Store - keys map[string]*LoadedKey - keyImportLock *MapStringLock - defaultStore string + log *logrus.Logger + db peridotdb.Access + storage lookaside.Storage + worker worker.Worker + temporal client.Client + stores map[string]store.Store + keys *sync.Map + defaultStore string } func NewServer(db peridotdb.Access, c client.Client) (*Server, error) { @@ -116,13 +88,9 @@ func NewServer(db peridotdb.Access, c client.Client) (*Server, error) { worker: worker.New(c, TaskQueue, worker.Options{ DeadlockDetectionTimeout: 15 * time.Minute, }), - temporal: c, - stores: map[string]store.Store{"awssm": sm}, - keys: map[string]*LoadedKey{}, - keyImportLock: &MapStringLock{ - RWMutex: &sync.RWMutex{}, - m: map[string]*sync.Mutex{}, - }, + temporal: c, + stores: map[string]store.Store{"awssm": sm}, + keys: &sync.Map{}, defaultStore: "awssm", }, nil } From 663d0bc99f86ce5ffebba3a3d41af7185695b933 Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Wed, 17 Aug 2022 00:52:00 +0200 Subject: [PATCH 7/8] Increase signing heartbeat timeout to 10 minutes Signed-off-by: Mustafa Gezen --- peridot/keykeeper/v1/sign.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peridot/keykeeper/v1/sign.go b/peridot/keykeeper/v1/sign.go index 5c00719..7178c43 100644 --- a/peridot/keykeeper/v1/sign.go +++ b/peridot/keykeeper/v1/sign.go @@ -114,7 +114,7 @@ func (s *Server) SignArtifactsWorkflow(ctx workflow.Context, artifacts models.Ta signArtifactCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ ScheduleToStartTimeout: 10 * time.Hour, StartToCloseTimeout: 24 * time.Hour, - HeartbeatTimeout: time.Minute, + HeartbeatTimeout: 10 * time.Minute, TaskQueue: TaskQueue, }) futures = append(futures, peridotworkflow.FutureContext{ From 1ffd01550ae6c9755f68a6425ba322170c88bdb5 Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Wed, 17 Aug 2022 01:55:56 +0200 Subject: [PATCH 8/8] Remove unused function (deleteGpgKey) Signed-off-by: Mustafa Gezen --- peridot/keykeeper/v1/keywarming.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/peridot/keykeeper/v1/keywarming.go b/peridot/keykeeper/v1/keywarming.go index 5ff67e6..640a860 100644 --- a/peridot/keykeeper/v1/keywarming.go +++ b/peridot/keykeeper/v1/keywarming.go @@ -65,14 +65,6 @@ func gpgCmdEnv(cmd *exec.Cmd) *exec.Cmd { return cmd } -func (s *Server) deleteGpgKey(keyId string) error { - out, err := logCmdRun(gpgCmdEnv(exec.Command("gpg", "--batch", "--yes", "--delete-secret-and-public-key", keyId))) - if err != nil { - s.log.Errorf("failed to delete gpg key: %s", out.String()) - } - return err -} - func (s *Server) importGpgKey(armoredKey string) error { cmd := gpgCmdEnv(exec.Command("gpg", "--batch", "--yes", "--import", "-")) cmd.Stdin = strings.NewReader(armoredKey)