diff --git a/peridot/keykeeper/v1/keywarming.go b/peridot/keykeeper/v1/keywarming.go index 120e6c11..dee276ad 100644 --- a/peridot/keykeeper/v1/keywarming.go +++ b/peridot/keykeeper/v1/keywarming.go @@ -106,9 +106,11 @@ func (s *Server) importRpmKey(publicKey string) error { // WarmGPGKey warms up a specific GPG key // This involves shelling out to GPG to import the key func (s *Server) WarmGPGKey(key string, armoredKey string, gpgKey *crypto.Key, db *models.Key) (*LoadedKey, error) { + s.keyImportLock.ReadLock(key) + defer s.keyImportLock.ReadUnlock(key) + cachedKey := s.keys[key] // This means that the key is already loaded - // We need to delete and replace it if cachedKey != nil { return cachedKey, nil } diff --git a/peridot/keykeeper/v1/server.go b/peridot/keykeeper/v1/server.go index 06428898..c2ba2c6e 100644 --- a/peridot/keykeeper/v1/server.go +++ b/peridot/keykeeper/v1/server.go @@ -57,17 +57,45 @@ import ( const TaskQueue = "keykeeper" +type MapStringLock struct { + *sync.RWMutex + m map[string]*sync.Mutex +} + +func (m *MapStringLock) ReadLock(key string) { + m.RLock() + defer m.RUnlock() + if m.m[key] == nil { + m.Lock() + m.m[key] = &sync.Mutex{} + m.Unlock() + } + m.m[key].Lock() +} + +func (m *MapStringLock) ReadUnlock(key string) { + m.RLock() + defer m.RUnlock() + if m.m[key] == nil { + m.Lock() + m.m[key] = &sync.Mutex{} + m.Unlock() + } + m.m[key].Unlock() +} + type Server struct { keykeeperpb.UnimplementedKeykeeperServiceServer - log *logrus.Logger - db peridotdb.Access - storage lookaside.Storage - worker worker.Worker - temporal client.Client - stores map[string]store.Store - keys map[string]*LoadedKey - defaultStore string + log *logrus.Logger + db peridotdb.Access + storage lookaside.Storage + worker worker.Worker + temporal client.Client + stores map[string]store.Store + keys map[string]*LoadedKey + keyImportLock *MapStringLock + defaultStore string } func NewServer(db peridotdb.Access, c client.Client) (*Server, error) { @@ -82,13 +110,19 @@ func NewServer(db peridotdb.Access, c client.Client) (*Server, error) { } return &Server{ - log: logrus.New(), - db: db, - storage: storage, - worker: worker.New(c, TaskQueue, worker.Options{}), - temporal: c, - stores: map[string]store.Store{"awssm": sm}, - keys: map[string]*LoadedKey{}, + log: logrus.New(), + db: db, + storage: storage, + worker: worker.New(c, TaskQueue, worker.Options{ + DeadlockDetectionTimeout: 15 * time.Minute, + }), + temporal: c, + stores: map[string]store.Store{"awssm": sm}, + keys: map[string]*LoadedKey{}, + keyImportLock: &MapStringLock{ + RWMutex: &sync.RWMutex{}, + m: map[string]*sync.Mutex{}, + }, defaultStore: "awssm", }, nil }