Add storage interface and S3 and memory implementations

This commit is contained in:
Mustafa Gezen 2023-08-27 06:01:01 +02:00
parent fcd0504696
commit 3e3e454ff8
Signed by: mustafa
GPG key ID: DCDF010D946438C1
10 changed files with 592 additions and 0 deletions

8
base/go/storage/BUILD Normal file
View file

@ -0,0 +1,8 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "storage",
srcs = ["storage.go"],
importpath = "go.resf.org/peridot/base/go/storage",
visibility = ["//visibility:public"],
)

View file

@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "detector",
srcs = ["detector.go"],
importpath = "go.resf.org/peridot/base/go/storage/detector",
visibility = ["//visibility:public"],
deps = [
"//base/go",
"//base/go/storage",
"//base/go/storage/s3",
"//vendor/github.com/pkg/errors",
"//vendor/github.com/urfave/cli/v2:cli",
],
)

View file

@ -0,0 +1,24 @@
package storage_detector
import (
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
base "go.resf.org/peridot/base/go"
"go.resf.org/peridot/base/go/storage"
storage_s3 "go.resf.org/peridot/base/go/storage/s3"
"net/url"
)
func FromFlags(ctx *cli.Context) (storage.Storage, error) {
parsedURI, err := url.Parse(ctx.String(string(base.EnvVarStorageConnectionString)))
if err != nil {
return nil, errors.Wrap(err, "failed to parse storage connection string")
}
switch parsedURI.Scheme {
case "s3":
return storage_s3.FromFlags(ctx)
default:
return nil, errors.Errorf("unknown storage scheme: %s", parsedURI.Scheme)
}
}

View file

@ -0,0 +1,25 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "memory",
srcs = ["memory.go"],
importpath = "go.resf.org/peridot/base/go/storage/memory",
visibility = ["//visibility:public"],
deps = [
"//base/go/storage",
"//vendor/github.com/go-git/go-billy/v5:go-billy",
"//vendor/github.com/pkg/errors",
],
)
go_test(
name = "memory_test",
size = "small",
srcs = ["memory_test.go"],
embed = [":memory"],
deps = [
"//base/go/storage",
"//vendor/github.com/go-git/go-billy/v5/memfs",
"//vendor/github.com/stretchr/testify/require",
],
)

View file

@ -0,0 +1,99 @@
package storage_memory
import (
"github.com/go-git/go-billy/v5"
"github.com/pkg/errors"
"go.resf.org/peridot/base/go/storage"
"io"
"os"
"strings"
)
type InMemory struct {
storage.Storage
fs billy.Filesystem
blobs map[string][]byte
}
func New(fs billy.Filesystem) *InMemory {
return &InMemory{
fs: fs,
blobs: make(map[string][]byte),
}
}
func (im *InMemory) Download(object string, toPath string) error {
blob, ok := im.blobs[object]
if !ok {
return storage.ErrNotFound
}
// Open file
f, err := im.fs.OpenFile(toPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return errors.Wrap(err, "failed to open file")
}
// Write blob to file
_, err = f.Write(blob)
if err != nil {
return errors.Wrap(err, "failed to write blob to file")
}
return nil
}
func (im *InMemory) Get(object string) ([]byte, error) {
blob, ok := im.blobs[object]
if !ok {
return nil, storage.ErrNotFound
}
return blob, nil
}
func (im *InMemory) Put(object string, fromPath string) (*storage.UploadInfo, error) {
// Open file
f, err := im.fs.Open(fromPath)
if err != nil {
return nil, errors.Wrap(err, "failed to open file")
}
// Read file into blob
blob, err := io.ReadAll(f)
if err != nil {
return nil, errors.Wrap(err, "failed to read file")
}
// Store blob
im.blobs[object] = blob
return &storage.UploadInfo{
Location: "memory://" + object,
VersionID: nil,
}, nil
}
func (im *InMemory) PutBytes(object string, blob []byte) (*storage.UploadInfo, error) {
// Store blob
im.blobs[object] = blob
return &storage.UploadInfo{
Location: "memory://" + object,
VersionID: nil,
}, nil
}
func (im *InMemory) Delete(object string) error {
delete(im.blobs, object)
return nil
}
func (im *InMemory) Exists(object string) (bool, error) {
_, ok := im.blobs[object]
return ok, nil
}
func (im *InMemory) CanReadURI(uri string) (bool, error) {
return strings.HasPrefix(uri, "memory://"), nil
}

View file

@ -0,0 +1,131 @@
package storage_memory
import (
"github.com/go-git/go-billy/v5/memfs"
"github.com/stretchr/testify/require"
"go.resf.org/peridot/base/go/storage"
"testing"
)
func TestNew(t *testing.T) {
require.NotNil(t, New(memfs.New()))
}
func TestInMemory_Download_Found(t *testing.T) {
fs := memfs.New()
im := New(fs)
im.blobs["foo"] = []byte("bar")
err := im.Download("foo", "foo")
require.Nil(t, err)
_, err = fs.Stat("foo")
require.Nil(t, err)
f, err := fs.Open("foo")
require.Nil(t, err)
buf := make([]byte, 3)
_, err = f.Read(buf)
require.Nil(t, err)
require.Equal(t, []byte("bar"), buf)
}
func TestInMemory_Download_NotFound(t *testing.T) {
fs := memfs.New()
im := New(fs)
err := im.Download("foo", "foo")
require.Equal(t, storage.ErrNotFound, err)
}
func TestInMemory_Get_Found(t *testing.T) {
fs := memfs.New()
im := New(fs)
im.blobs["foo"] = []byte("bar")
blob, err := im.Get("foo")
require.Nil(t, err)
require.Equal(t, []byte("bar"), blob)
}
func TestInMemory_Get_NotFound(t *testing.T) {
fs := memfs.New()
im := New(fs)
_, err := im.Get("foo")
require.Equal(t, storage.ErrNotFound, err)
}
func TestInMemory_Put(t *testing.T) {
fs := memfs.New()
f, err := fs.Create("foo")
require.Nil(t, err)
_, err = f.Write([]byte("bar"))
require.Nil(t, err)
err = f.Close()
require.Nil(t, err)
im := New(fs)
_, err = im.Put("foo", "foo")
require.Nil(t, err)
require.Equal(t, []byte("bar"), im.blobs["foo"])
}
func TestInMemory_Put_NotFound(t *testing.T) {
fs := memfs.New()
im := New(fs)
_, err := im.Put("foo", "testdata/bar")
require.NotNil(t, err)
require.Equal(t, "failed to open file: file does not exist", err.Error())
}
func TestInMemory_PutBytes(t *testing.T) {
fs := memfs.New()
im := New(fs)
_, err := im.PutBytes("foo", []byte("bar"))
require.Nil(t, err)
require.Equal(t, []byte("bar"), im.blobs["foo"])
}
func TestInMemory_Delete(t *testing.T) {
fs := memfs.New()
im := New(fs)
im.blobs["foo"] = []byte("bar")
err := im.Delete("foo")
require.Nil(t, err)
_, ok := im.blobs["foo"]
require.False(t, ok)
}
func TestInMemory_Exists_Found(t *testing.T) {
fs := memfs.New()
im := New(fs)
im.blobs["foo"] = []byte("bar")
ok, err := im.Exists("foo")
require.Nil(t, err)
require.True(t, ok)
}
func TestInMemory_Exists_NotFound(t *testing.T) {
fs := memfs.New()
im := New(fs)
ok, err := im.Exists("foo")
require.Nil(t, err)
require.False(t, ok)
}
func TestInMemory_CanReadURI(t *testing.T) {
fs := memfs.New()
im := New(fs)
ok, err := im.CanReadURI("memory://foo")
require.Nil(t, err)
require.True(t, ok)
}
func TestInMemory_CanReadURI_No(t *testing.T) {
fs := memfs.New()
im := New(fs)
ok, err := im.CanReadURI("file://foo")
require.Nil(t, err)
require.False(t, ok)
}

23
base/go/storage/s3/BUILD Normal file
View file

@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "s3",
srcs = [
"detector.go",
"s3.go",
],
importpath = "go.resf.org/peridot/base/go/storage/s3",
visibility = ["//visibility:public"],
deps = [
"//base/go",
"//base/go/awsutils",
"//base/go/storage",
"//vendor/github.com/aws/aws-sdk-go/aws",
"//vendor/github.com/aws/aws-sdk-go/aws/awserr",
"//vendor/github.com/aws/aws-sdk-go/aws/session",
"//vendor/github.com/aws/aws-sdk-go/service/s3",
"//vendor/github.com/aws/aws-sdk-go/service/s3/s3manager",
"//vendor/github.com/pkg/errors",
"//vendor/github.com/urfave/cli/v2:cli",
],
)

View file

@ -0,0 +1,37 @@
package storage_s3
import (
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
base "go.resf.org/peridot/base/go"
"net/url"
"strings"
)
func FromFlags(ctx *cli.Context) (*S3, error) {
// Parse the connection string
parsedURI, err := url.Parse(ctx.String(string(base.EnvVarStorageConnectionString)))
if err != nil {
return nil, errors.Wrap(err, "failed to parse storage connection string")
}
// Retrieve the bucket name
bucket := parsedURI.Path
// Remove the leading/trailing slashes
bucket = strings.TrimSuffix(strings.TrimPrefix(bucket, "/"), "/")
// Convert certain flags into environment variables so that they can be used by the AWS SDK
base.RareUseChangeDefault("AWS_REGION", ctx.String(string(base.EnvVarStorageRegion)))
base.RareUseChangeDefault("AWS_ENDPOINT", ctx.String(string(base.EnvVarStorageEndpoint)))
if !ctx.Bool(string(base.EnvVarStorageSecure)) {
base.RareUseChangeDefault("AWS_DISABLE_SSL", "true")
}
if ctx.Bool(string(base.EnvVarStoragePathStyle)) {
base.RareUseChangeDefault("AWS_S3_FORCE_PATH_STYLE", "true")
}
return New(bucket)
}

187
base/go/storage/s3/s3.go Normal file
View file

@ -0,0 +1,187 @@
package storage_s3
import (
"bytes"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"go.resf.org/peridot/base/go/awsutils"
"go.resf.org/peridot/base/go/storage"
"net/url"
"os"
"strings"
)
// S3 is an implementation of the Storage interface for S3.
type S3 struct {
storage.Storage
bucket string
uploader *s3manager.Uploader
downloader *s3manager.Downloader
}
// New creates a new S3 storage backend.
// Supports AWS CLI related environment variables.
func New(bucket string) (*S3, error) {
awsCfg := &aws.Config{}
awsutils.FillOutConfig(awsCfg)
sess, err := session.NewSession(awsCfg)
if err != nil {
return nil, err
}
uploader := s3manager.NewUploader(sess)
downloader := s3manager.NewDownloader(sess)
return &S3{
bucket: bucket,
uploader: uploader,
downloader: downloader,
}, nil
}
// Download downloads a file from the storage backend to the given path.
func (s *S3) Download(object string, toPath string) error {
f, err := os.OpenFile(toPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer f.Close()
_, err = s.downloader.Download(f, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(object),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == s3.ErrCodeNoSuchKey {
return storage.ErrNotFound
}
}
}
return err
}
// Get returns the contents of a file from the storage backend.
func (s *S3) Get(object string) ([]byte, error) {
buf := aws.NewWriteAtBuffer([]byte{})
_, err := s.downloader.Download(buf, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(object),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == s3.ErrCodeNoSuchKey {
return nil, storage.ErrNotFound
}
}
}
return buf.Bytes(), err
}
// Put uploads a file to the storage backend.
func (s *S3) Put(object string, fromPath string) (*storage.UploadInfo, error) {
f, err := os.Open(fromPath)
if err != nil {
return nil, err
}
defer f.Close()
result, err := s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(object),
Body: f,
})
return &storage.UploadInfo{
Location: result.Location,
VersionID: result.VersionID,
}, err
}
// PutBytes uploads a file to the storage backend.
func (s *S3) PutBytes(object string, data []byte) (*storage.UploadInfo, error) {
result, err := s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(object),
Body: aws.ReadSeekCloser(bytes.NewBuffer(data)),
})
return &storage.UploadInfo{
Location: result.Location,
VersionID: result.VersionID,
}, err
}
// Delete deletes a file from the storage backend.
func (s *S3) Delete(object string) error {
_, err := s.uploader.S3.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(object),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == s3.ErrCodeNoSuchKey {
return storage.ErrNotFound
}
}
}
return err
}
// Exists checks if a file exists in the storage backend.
func (s *S3) Exists(object string) (bool, error) {
_, err := s.uploader.S3.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(object),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == s3.ErrCodeNoSuchKey {
return false, nil
}
}
return false, err
}
return true, nil
}
// CanReadURI checks if a URI can be read by the storage backend.
func (s *S3) CanReadURI(uri string) (bool, error) {
parsed, err := url.Parse(uri)
if err != nil {
return false, err
}
if parsed.Scheme != "s3" {
return false, nil
}
// Split the path into bucket and object.
// The first element is the bucket, the rest is the object.
split := strings.SplitN(parsed.Path, "/", 2)
// Verify length.
if len(split) < 2 {
return false, nil
}
// Verify bucket.
if split[0] != s.bucket {
return false, nil
}
// Verify object.
if len(split[1]) == 0 {
return false, nil
}
return true, nil
}

View file

@ -0,0 +1,43 @@
package storage
import "errors"
var ErrNotFound = errors.New("not found")
// UploadInfo is the information about an upload.
type UploadInfo struct {
// Location is the location of the uploaded file.
Location string
// VersionID is the version ID of the uploaded file.
VersionID *string
}
// Storage is an interface for storage backends.
// Usually S3, but can be anything.
type Storage interface {
// Download downloads a file from the storage backend to the given path.
Download(object string, toPath string) error
// Get returns the contents of a file from the storage backend.
// Returns ErrNotFound if the file does not exist.
Get(object string) ([]byte, error)
// Put uploads a file to the storage backend.
Put(object string, fromPath string) (*UploadInfo, error)
// PutBytes uploads a file to the storage backend.
PutBytes(object string, data []byte) (*UploadInfo, error)
// Delete deletes a file from the storage backend.
// Returns ErrNotFound if the file does not exist.
Delete(object string) error
// Exists checks if a file exists in the storage backend.
// Returns false if the file does not exist.
Exists(object string) (bool, error)
// CanReadURI checks if a URI can be read by the storage backend.
// Returns false if the URI cannot be read.
CanReadURI(uri string) (bool, error)
}