mirror of
https://github.com/rocky-linux/peridot.git
synced 2024-12-19 09:28:29 +00:00
778 lines
23 KiB
Go
778 lines
23 KiB
Go
|
package s3manager
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"sort"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/aws/aws-sdk-go/aws"
|
||
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||
|
"github.com/aws/aws-sdk-go/aws/awsutil"
|
||
|
"github.com/aws/aws-sdk-go/aws/client"
|
||
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||
|
"github.com/aws/aws-sdk-go/aws/request"
|
||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||
|
"github.com/aws/aws-sdk-go/service/s3/s3iface"
|
||
|
)
|
||
|
|
||
|
// MaxUploadParts is the maximum allowed number of parts in a multi-part upload
|
||
|
// on Amazon S3.
|
||
|
const MaxUploadParts = 10000
|
||
|
|
||
|
// MinUploadPartSize is the minimum allowed part size when uploading a part to
|
||
|
// Amazon S3.
|
||
|
const MinUploadPartSize int64 = 1024 * 1024 * 5
|
||
|
|
||
|
// DefaultUploadPartSize is the default part size to buffer chunks of a
|
||
|
// payload into.
|
||
|
const DefaultUploadPartSize = MinUploadPartSize
|
||
|
|
||
|
// DefaultUploadConcurrency is the default number of goroutines to spin up when
|
||
|
// using Upload().
|
||
|
const DefaultUploadConcurrency = 5
|
||
|
|
||
|
// A MultiUploadFailure wraps a failed S3 multipart upload. An error returned
|
||
|
// will satisfy this interface when a multi part upload failed to upload all
|
||
|
// chucks to S3. In the case of a failure the UploadID is needed to operate on
|
||
|
// the chunks, if any, which were uploaded.
|
||
|
//
|
||
|
// Example:
|
||
|
//
|
||
|
// u := s3manager.NewUploader(opts)
|
||
|
// output, err := u.upload(input)
|
||
|
// if err != nil {
|
||
|
// if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
|
||
|
// // Process error and its associated uploadID
|
||
|
// fmt.Println("Error:", multierr.Code(), multierr.Message(), multierr.UploadID())
|
||
|
// } else {
|
||
|
// // Process error generically
|
||
|
// fmt.Println("Error:", err.Error())
|
||
|
// }
|
||
|
// }
|
||
|
//
|
||
|
type MultiUploadFailure interface {
|
||
|
awserr.Error
|
||
|
|
||
|
// Returns the upload id for the S3 multipart upload that failed.
|
||
|
UploadID() string
|
||
|
}
|
||
|
|
||
|
// So that the Error interface type can be included as an anonymous field
|
||
|
// in the multiUploadError struct and not conflict with the error.Error() method.
|
||
|
type awsError awserr.Error
|
||
|
|
||
|
// A multiUploadError wraps the upload ID of a failed s3 multipart upload.
|
||
|
// Composed of BaseError for code, message, and original error
|
||
|
//
|
||
|
// Should be used for an error that occurred failing a S3 multipart upload,
|
||
|
// and a upload ID is available. If an uploadID is not available a more relevant
|
||
|
type multiUploadError struct {
|
||
|
awsError
|
||
|
|
||
|
// ID for multipart upload which failed.
|
||
|
uploadID string
|
||
|
}
|
||
|
|
||
|
// Error returns the string representation of the error.
|
||
|
//
|
||
|
// See apierr.BaseError ErrorWithExtra for output format
|
||
|
//
|
||
|
// Satisfies the error interface.
|
||
|
func (m multiUploadError) Error() string {
|
||
|
extra := fmt.Sprintf("upload id: %s", m.uploadID)
|
||
|
return awserr.SprintError(m.Code(), m.Message(), extra, m.OrigErr())
|
||
|
}
|
||
|
|
||
|
// String returns the string representation of the error.
|
||
|
// Alias for Error to satisfy the stringer interface.
|
||
|
func (m multiUploadError) String() string {
|
||
|
return m.Error()
|
||
|
}
|
||
|
|
||
|
// UploadID returns the id of the S3 upload which failed.
|
||
|
func (m multiUploadError) UploadID() string {
|
||
|
return m.uploadID
|
||
|
}
|
||
|
|
||
|
// UploadOutput represents a response from the Upload() call.
|
||
|
type UploadOutput struct {
|
||
|
// The URL where the object was uploaded to.
|
||
|
Location string
|
||
|
|
||
|
// The version of the object that was uploaded. Will only be populated if
|
||
|
// the S3 Bucket is versioned. If the bucket is not versioned this field
|
||
|
// will not be set.
|
||
|
VersionID *string
|
||
|
|
||
|
// The ID for a multipart upload to S3. In the case of an error the error
|
||
|
// can be cast to the MultiUploadFailure interface to extract the upload ID.
|
||
|
UploadID string
|
||
|
}
|
||
|
|
||
|
// WithUploaderRequestOptions appends to the Uploader's API request options.
|
||
|
func WithUploaderRequestOptions(opts ...request.Option) func(*Uploader) {
|
||
|
return func(u *Uploader) {
|
||
|
u.RequestOptions = append(u.RequestOptions, opts...)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// The Uploader structure that calls Upload(). It is safe to call Upload()
|
||
|
// on this structure for multiple objects and across concurrent goroutines.
|
||
|
// Mutating the Uploader's properties is not safe to be done concurrently.
|
||
|
type Uploader struct {
|
||
|
// The buffer size (in bytes) to use when buffering data into chunks and
|
||
|
// sending them as parts to S3. The minimum allowed part size is 5MB, and
|
||
|
// if this value is set to zero, the DefaultUploadPartSize value will be used.
|
||
|
PartSize int64
|
||
|
|
||
|
// The number of goroutines to spin up in parallel per call to Upload when
|
||
|
// sending parts. If this is set to zero, the DefaultUploadConcurrency value
|
||
|
// will be used.
|
||
|
//
|
||
|
// The concurrency pool is not shared between calls to Upload.
|
||
|
Concurrency int
|
||
|
|
||
|
// Setting this value to true will cause the SDK to avoid calling
|
||
|
// AbortMultipartUpload on a failure, leaving all successfully uploaded
|
||
|
// parts on S3 for manual recovery.
|
||
|
//
|
||
|
// Note that storing parts of an incomplete multipart upload counts towards
|
||
|
// space usage on S3 and will add additional costs if not cleaned up.
|
||
|
LeavePartsOnError bool
|
||
|
|
||
|
// MaxUploadParts is the max number of parts which will be uploaded to S3.
|
||
|
// Will be used to calculate the partsize of the object to be uploaded.
|
||
|
// E.g: 5GB file, with MaxUploadParts set to 100, will upload the file
|
||
|
// as 100, 50MB parts. With a limited of s3.MaxUploadParts (10,000 parts).
|
||
|
//
|
||
|
// MaxUploadParts must not be used to limit the total number of bytes uploaded.
|
||
|
// Use a type like to io.LimitReader (https://golang.org/pkg/io/#LimitedReader)
|
||
|
// instead. An io.LimitReader is helpful when uploading an unbounded reader
|
||
|
// to S3, and you know its maximum size. Otherwise the reader's io.EOF returned
|
||
|
// error must be used to signal end of stream.
|
||
|
//
|
||
|
// Defaults to package const's MaxUploadParts value.
|
||
|
MaxUploadParts int
|
||
|
|
||
|
// The client to use when uploading to S3.
|
||
|
S3 s3iface.S3API
|
||
|
|
||
|
// List of request options that will be passed down to individual API
|
||
|
// operation requests made by the uploader.
|
||
|
RequestOptions []request.Option
|
||
|
|
||
|
// Defines the buffer strategy used when uploading a part
|
||
|
BufferProvider ReadSeekerWriteToProvider
|
||
|
|
||
|
// partPool allows for the re-usage of streaming payload part buffers between upload calls
|
||
|
partPool byteSlicePool
|
||
|
}
|
||
|
|
||
|
// NewUploader creates a new Uploader instance to upload objects to S3. Pass In
|
||
|
// additional functional options to customize the uploader's behavior. Requires a
|
||
|
// client.ConfigProvider in order to create a S3 service client. The session.Session
|
||
|
// satisfies the client.ConfigProvider interface.
|
||
|
//
|
||
|
// Example:
|
||
|
// // The session the S3 Uploader will use
|
||
|
// sess := session.Must(session.NewSession())
|
||
|
//
|
||
|
// // Create an uploader with the session and default options
|
||
|
// uploader := s3manager.NewUploader(sess)
|
||
|
//
|
||
|
// // Create an uploader with the session and custom options
|
||
|
// uploader := s3manager.NewUploader(session, func(u *s3manager.Uploader) {
|
||
|
// u.PartSize = 64 * 1024 * 1024 // 64MB per part
|
||
|
// })
|
||
|
func NewUploader(c client.ConfigProvider, options ...func(*Uploader)) *Uploader {
|
||
|
return newUploader(s3.New(c), options...)
|
||
|
}
|
||
|
|
||
|
func newUploader(client s3iface.S3API, options ...func(*Uploader)) *Uploader {
|
||
|
u := &Uploader{
|
||
|
S3: client,
|
||
|
PartSize: DefaultUploadPartSize,
|
||
|
Concurrency: DefaultUploadConcurrency,
|
||
|
LeavePartsOnError: false,
|
||
|
MaxUploadParts: MaxUploadParts,
|
||
|
BufferProvider: defaultUploadBufferProvider(),
|
||
|
}
|
||
|
|
||
|
for _, option := range options {
|
||
|
option(u)
|
||
|
}
|
||
|
|
||
|
u.partPool = newByteSlicePool(u.PartSize)
|
||
|
|
||
|
return u
|
||
|
}
|
||
|
|
||
|
// NewUploaderWithClient creates a new Uploader instance to upload objects to S3. Pass in
|
||
|
// additional functional options to customize the uploader's behavior. Requires
|
||
|
// a S3 service client to make S3 API calls.
|
||
|
//
|
||
|
// Example:
|
||
|
// // The session the S3 Uploader will use
|
||
|
// sess := session.Must(session.NewSession())
|
||
|
//
|
||
|
// // S3 service client the Upload manager will use.
|
||
|
// s3Svc := s3.New(sess)
|
||
|
//
|
||
|
// // Create an uploader with S3 client and default options
|
||
|
// uploader := s3manager.NewUploaderWithClient(s3Svc)
|
||
|
//
|
||
|
// // Create an uploader with S3 client and custom options
|
||
|
// uploader := s3manager.NewUploaderWithClient(s3Svc, func(u *s3manager.Uploader) {
|
||
|
// u.PartSize = 64 * 1024 * 1024 // 64MB per part
|
||
|
// })
|
||
|
func NewUploaderWithClient(svc s3iface.S3API, options ...func(*Uploader)) *Uploader {
|
||
|
return newUploader(svc, options...)
|
||
|
}
|
||
|
|
||
|
// Upload uploads an object to S3, intelligently buffering large files into
|
||
|
// smaller chunks and sending them in parallel across multiple goroutines. You
|
||
|
// can configure the buffer size and concurrency through the Uploader's parameters.
|
||
|
//
|
||
|
// Additional functional options can be provided to configure the individual
|
||
|
// upload. These options are copies of the Uploader instance Upload is called from.
|
||
|
// Modifying the options will not impact the original Uploader instance.
|
||
|
//
|
||
|
// Use the WithUploaderRequestOptions helper function to pass in request
|
||
|
// options that will be applied to all API operations made with this uploader.
|
||
|
//
|
||
|
// It is safe to call this method concurrently across goroutines.
|
||
|
//
|
||
|
// Example:
|
||
|
// // Upload input parameters
|
||
|
// upParams := &s3manager.UploadInput{
|
||
|
// Bucket: &bucketName,
|
||
|
// Key: &keyName,
|
||
|
// Body: file,
|
||
|
// }
|
||
|
//
|
||
|
// // Perform an upload.
|
||
|
// result, err := uploader.Upload(upParams)
|
||
|
//
|
||
|
// // Perform upload with options different than the those in the Uploader.
|
||
|
// result, err := uploader.Upload(upParams, func(u *s3manager.Uploader) {
|
||
|
// u.PartSize = 10 * 1024 * 1024 // 10MB part size
|
||
|
// u.LeavePartsOnError = true // Don't delete the parts if the upload fails.
|
||
|
// })
|
||
|
func (u Uploader) Upload(input *UploadInput, options ...func(*Uploader)) (*UploadOutput, error) {
|
||
|
return u.UploadWithContext(aws.BackgroundContext(), input, options...)
|
||
|
}
|
||
|
|
||
|
// UploadWithContext uploads an object to S3, intelligently buffering large
|
||
|
// files into smaller chunks and sending them in parallel across multiple
|
||
|
// goroutines. You can configure the buffer size and concurrency through the
|
||
|
// Uploader's parameters.
|
||
|
//
|
||
|
// UploadWithContext is the same as Upload with the additional support for
|
||
|
// Context input parameters. The Context must not be nil. A nil Context will
|
||
|
// cause a panic. Use the context to add deadlining, timeouts, etc. The
|
||
|
// UploadWithContext may create sub-contexts for individual underlying requests.
|
||
|
//
|
||
|
// Additional functional options can be provided to configure the individual
|
||
|
// upload. These options are copies of the Uploader instance Upload is called from.
|
||
|
// Modifying the options will not impact the original Uploader instance.
|
||
|
//
|
||
|
// Use the WithUploaderRequestOptions helper function to pass in request
|
||
|
// options that will be applied to all API operations made with this uploader.
|
||
|
//
|
||
|
// It is safe to call this method concurrently across goroutines.
|
||
|
func (u Uploader) UploadWithContext(ctx aws.Context, input *UploadInput, opts ...func(*Uploader)) (*UploadOutput, error) {
|
||
|
i := uploader{in: input, cfg: u, ctx: ctx}
|
||
|
|
||
|
for _, opt := range opts {
|
||
|
opt(&i.cfg)
|
||
|
}
|
||
|
|
||
|
i.cfg.RequestOptions = append(i.cfg.RequestOptions, request.WithAppendUserAgent("S3Manager"))
|
||
|
|
||
|
return i.upload()
|
||
|
}
|
||
|
|
||
|
// UploadWithIterator will upload a batched amount of objects to S3. This operation uses
|
||
|
// the iterator pattern to know which object to upload next. Since this is an interface this
|
||
|
// allows for custom defined functionality.
|
||
|
//
|
||
|
// Example:
|
||
|
// svc:= s3manager.NewUploader(sess)
|
||
|
//
|
||
|
// objects := []BatchUploadObject{
|
||
|
// {
|
||
|
// Object: &s3manager.UploadInput {
|
||
|
// Key: aws.String("key"),
|
||
|
// Bucket: aws.String("bucket"),
|
||
|
// },
|
||
|
// },
|
||
|
// }
|
||
|
//
|
||
|
// iter := &s3manager.UploadObjectsIterator{Objects: objects}
|
||
|
// if err := svc.UploadWithIterator(aws.BackgroundContext(), iter); err != nil {
|
||
|
// return err
|
||
|
// }
|
||
|
func (u Uploader) UploadWithIterator(ctx aws.Context, iter BatchUploadIterator, opts ...func(*Uploader)) error {
|
||
|
var errs []Error
|
||
|
for iter.Next() {
|
||
|
object := iter.UploadObject()
|
||
|
if _, err := u.UploadWithContext(ctx, object.Object, opts...); err != nil {
|
||
|
s3Err := Error{
|
||
|
OrigErr: err,
|
||
|
Bucket: object.Object.Bucket,
|
||
|
Key: object.Object.Key,
|
||
|
}
|
||
|
|
||
|
errs = append(errs, s3Err)
|
||
|
}
|
||
|
|
||
|
if object.After == nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if err := object.After(); err != nil {
|
||
|
s3Err := Error{
|
||
|
OrigErr: err,
|
||
|
Bucket: object.Object.Bucket,
|
||
|
Key: object.Object.Key,
|
||
|
}
|
||
|
|
||
|
errs = append(errs, s3Err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if len(errs) > 0 {
|
||
|
return NewBatchError("BatchedUploadIncomplete", "some objects have failed to upload.", errs)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// internal structure to manage an upload to S3.
|
||
|
type uploader struct {
|
||
|
ctx aws.Context
|
||
|
cfg Uploader
|
||
|
|
||
|
in *UploadInput
|
||
|
|
||
|
readerPos int64 // current reader position
|
||
|
totalSize int64 // set to -1 if the size is not known
|
||
|
}
|
||
|
|
||
|
// internal logic for deciding whether to upload a single part or use a
|
||
|
// multipart upload.
|
||
|
func (u *uploader) upload() (*UploadOutput, error) {
|
||
|
if err := u.init(); err != nil {
|
||
|
return nil, awserr.New("ReadRequestBody", "unable to initialize upload", err)
|
||
|
}
|
||
|
defer u.cfg.partPool.Close()
|
||
|
|
||
|
if u.cfg.PartSize < MinUploadPartSize {
|
||
|
msg := fmt.Sprintf("part size must be at least %d bytes", MinUploadPartSize)
|
||
|
return nil, awserr.New("ConfigError", msg, nil)
|
||
|
}
|
||
|
|
||
|
// Do one read to determine if we have more than one part
|
||
|
reader, _, cleanup, err := u.nextReader()
|
||
|
if err == io.EOF { // single part
|
||
|
return u.singlePart(reader, cleanup)
|
||
|
} else if err != nil {
|
||
|
cleanup()
|
||
|
return nil, awserr.New("ReadRequestBody", "read upload data failed", err)
|
||
|
}
|
||
|
|
||
|
mu := multiuploader{uploader: u}
|
||
|
return mu.upload(reader, cleanup)
|
||
|
}
|
||
|
|
||
|
// init will initialize all default options.
|
||
|
func (u *uploader) init() error {
|
||
|
if u.cfg.Concurrency == 0 {
|
||
|
u.cfg.Concurrency = DefaultUploadConcurrency
|
||
|
}
|
||
|
if u.cfg.PartSize == 0 {
|
||
|
u.cfg.PartSize = DefaultUploadPartSize
|
||
|
}
|
||
|
if u.cfg.MaxUploadParts == 0 {
|
||
|
u.cfg.MaxUploadParts = MaxUploadParts
|
||
|
}
|
||
|
|
||
|
// Try to get the total size for some optimizations
|
||
|
if err := u.initSize(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// If PartSize was changed or partPool was never setup then we need to allocated a new pool
|
||
|
// so that we return []byte slices of the correct size
|
||
|
poolCap := u.cfg.Concurrency + 1
|
||
|
if u.cfg.partPool == nil || u.cfg.partPool.SliceSize() != u.cfg.PartSize {
|
||
|
u.cfg.partPool = newByteSlicePool(u.cfg.PartSize)
|
||
|
u.cfg.partPool.ModifyCapacity(poolCap)
|
||
|
} else {
|
||
|
u.cfg.partPool = &returnCapacityPoolCloser{byteSlicePool: u.cfg.partPool}
|
||
|
u.cfg.partPool.ModifyCapacity(poolCap)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// initSize tries to detect the total stream size, setting u.totalSize. If
|
||
|
// the size is not known, totalSize is set to -1.
|
||
|
func (u *uploader) initSize() error {
|
||
|
u.totalSize = -1
|
||
|
|
||
|
switch r := u.in.Body.(type) {
|
||
|
case io.Seeker:
|
||
|
n, err := aws.SeekerLen(r)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
u.totalSize = n
|
||
|
|
||
|
// Try to adjust partSize if it is too small and account for
|
||
|
// integer division truncation.
|
||
|
if u.totalSize/u.cfg.PartSize >= int64(u.cfg.MaxUploadParts) {
|
||
|
// Add one to the part size to account for remainders
|
||
|
// during the size calculation. e.g odd number of bytes.
|
||
|
u.cfg.PartSize = (u.totalSize / int64(u.cfg.MaxUploadParts)) + 1
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// nextReader returns a seekable reader representing the next packet of data.
|
||
|
// This operation increases the shared u.readerPos counter, but note that it
|
||
|
// does not need to be wrapped in a mutex because nextReader is only called
|
||
|
// from the main thread.
|
||
|
func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) {
|
||
|
switch r := u.in.Body.(type) {
|
||
|
case readerAtSeeker:
|
||
|
var err error
|
||
|
|
||
|
n := u.cfg.PartSize
|
||
|
if u.totalSize >= 0 {
|
||
|
bytesLeft := u.totalSize - u.readerPos
|
||
|
|
||
|
if bytesLeft <= u.cfg.PartSize {
|
||
|
err = io.EOF
|
||
|
n = bytesLeft
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
reader io.ReadSeeker
|
||
|
cleanup func()
|
||
|
)
|
||
|
|
||
|
reader = io.NewSectionReader(r, u.readerPos, n)
|
||
|
if u.cfg.BufferProvider != nil {
|
||
|
reader, cleanup = u.cfg.BufferProvider.GetWriteTo(reader)
|
||
|
} else {
|
||
|
cleanup = func() {}
|
||
|
}
|
||
|
|
||
|
u.readerPos += n
|
||
|
|
||
|
return reader, int(n), cleanup, err
|
||
|
|
||
|
default:
|
||
|
part, err := u.cfg.partPool.Get(u.ctx)
|
||
|
if err != nil {
|
||
|
return nil, 0, func() {}, err
|
||
|
}
|
||
|
|
||
|
n, err := readFillBuf(r, *part)
|
||
|
u.readerPos += int64(n)
|
||
|
|
||
|
cleanup := func() {
|
||
|
u.cfg.partPool.Put(part)
|
||
|
}
|
||
|
|
||
|
return bytes.NewReader((*part)[0:n]), n, cleanup, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func readFillBuf(r io.Reader, b []byte) (offset int, err error) {
|
||
|
for offset < len(b) && err == nil {
|
||
|
var n int
|
||
|
n, err = r.Read(b[offset:])
|
||
|
offset += n
|
||
|
}
|
||
|
|
||
|
return offset, err
|
||
|
}
|
||
|
|
||
|
// singlePart contains upload logic for uploading a single chunk via
|
||
|
// a regular PutObject request. Multipart requests require at least two
|
||
|
// parts, or at least 5MB of data.
|
||
|
func (u *uploader) singlePart(r io.ReadSeeker, cleanup func()) (*UploadOutput, error) {
|
||
|
defer cleanup()
|
||
|
|
||
|
params := &s3.PutObjectInput{}
|
||
|
awsutil.Copy(params, u.in)
|
||
|
params.Body = r
|
||
|
|
||
|
// Need to use request form because URL generated in request is
|
||
|
// used in return.
|
||
|
req, out := u.cfg.S3.PutObjectRequest(params)
|
||
|
req.SetContext(u.ctx)
|
||
|
req.ApplyOptions(u.cfg.RequestOptions...)
|
||
|
if err := req.Send(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
url := req.HTTPRequest.URL.String()
|
||
|
return &UploadOutput{
|
||
|
Location: url,
|
||
|
VersionID: out.VersionId,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// internal structure to manage a specific multipart upload to S3.
|
||
|
type multiuploader struct {
|
||
|
*uploader
|
||
|
wg sync.WaitGroup
|
||
|
m sync.Mutex
|
||
|
err error
|
||
|
uploadID string
|
||
|
parts completedParts
|
||
|
}
|
||
|
|
||
|
// keeps track of a single chunk of data being sent to S3.
|
||
|
type chunk struct {
|
||
|
buf io.ReadSeeker
|
||
|
num int64
|
||
|
cleanup func()
|
||
|
}
|
||
|
|
||
|
// completedParts is a wrapper to make parts sortable by their part number,
|
||
|
// since S3 required this list to be sent in sorted order.
|
||
|
type completedParts []*s3.CompletedPart
|
||
|
|
||
|
func (a completedParts) Len() int { return len(a) }
|
||
|
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||
|
func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
|
||
|
|
||
|
// upload will perform a multipart upload using the firstBuf buffer containing
|
||
|
// the first chunk of data.
|
||
|
func (u *multiuploader) upload(firstBuf io.ReadSeeker, cleanup func()) (*UploadOutput, error) {
|
||
|
params := &s3.CreateMultipartUploadInput{}
|
||
|
awsutil.Copy(params, u.in)
|
||
|
|
||
|
// Create the multipart
|
||
|
resp, err := u.cfg.S3.CreateMultipartUploadWithContext(u.ctx, params, u.cfg.RequestOptions...)
|
||
|
if err != nil {
|
||
|
cleanup()
|
||
|
return nil, err
|
||
|
}
|
||
|
u.uploadID = *resp.UploadId
|
||
|
|
||
|
// Create the workers
|
||
|
ch := make(chan chunk, u.cfg.Concurrency)
|
||
|
for i := 0; i < u.cfg.Concurrency; i++ {
|
||
|
u.wg.Add(1)
|
||
|
go u.readChunk(ch)
|
||
|
}
|
||
|
|
||
|
// Send part 1 to the workers
|
||
|
var num int64 = 1
|
||
|
ch <- chunk{buf: firstBuf, num: num, cleanup: cleanup}
|
||
|
|
||
|
// Read and queue the rest of the parts
|
||
|
for u.geterr() == nil && err == nil {
|
||
|
var (
|
||
|
reader io.ReadSeeker
|
||
|
nextChunkLen int
|
||
|
ok bool
|
||
|
)
|
||
|
|
||
|
reader, nextChunkLen, cleanup, err = u.nextReader()
|
||
|
ok, err = u.shouldContinue(num, nextChunkLen, err)
|
||
|
if !ok {
|
||
|
cleanup()
|
||
|
if err != nil {
|
||
|
u.seterr(err)
|
||
|
}
|
||
|
break
|
||
|
}
|
||
|
|
||
|
num++
|
||
|
|
||
|
ch <- chunk{buf: reader, num: num, cleanup: cleanup}
|
||
|
}
|
||
|
|
||
|
// Close the channel, wait for workers, and complete upload
|
||
|
close(ch)
|
||
|
u.wg.Wait()
|
||
|
complete := u.complete()
|
||
|
|
||
|
if err := u.geterr(); err != nil {
|
||
|
return nil, &multiUploadError{
|
||
|
awsError: awserr.New(
|
||
|
"MultipartUpload",
|
||
|
"upload multipart failed",
|
||
|
err),
|
||
|
uploadID: u.uploadID,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Create a presigned URL of the S3 Get Object in order to have parity with
|
||
|
// single part upload.
|
||
|
getReq, _ := u.cfg.S3.GetObjectRequest(&s3.GetObjectInput{
|
||
|
Bucket: u.in.Bucket,
|
||
|
Key: u.in.Key,
|
||
|
})
|
||
|
getReq.Config.Credentials = credentials.AnonymousCredentials
|
||
|
getReq.SetContext(u.ctx)
|
||
|
uploadLocation, _, _ := getReq.PresignRequest(1)
|
||
|
|
||
|
return &UploadOutput{
|
||
|
Location: uploadLocation,
|
||
|
VersionID: complete.VersionId,
|
||
|
UploadID: u.uploadID,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
func (u *multiuploader) shouldContinue(part int64, nextChunkLen int, err error) (bool, error) {
|
||
|
if err != nil && err != io.EOF {
|
||
|
return false, awserr.New("ReadRequestBody", "read multipart upload data failed", err)
|
||
|
}
|
||
|
|
||
|
if nextChunkLen == 0 {
|
||
|
// No need to upload empty part, if file was empty to start
|
||
|
// with empty single part would of been created and never
|
||
|
// started multipart upload.
|
||
|
return false, nil
|
||
|
}
|
||
|
|
||
|
part++
|
||
|
// This upload exceeded maximum number of supported parts, error now.
|
||
|
if part > int64(u.cfg.MaxUploadParts) || part > int64(MaxUploadParts) {
|
||
|
var msg string
|
||
|
if part > int64(u.cfg.MaxUploadParts) {
|
||
|
msg = fmt.Sprintf("exceeded total allowed configured MaxUploadParts (%d). Adjust PartSize to fit in this limit",
|
||
|
u.cfg.MaxUploadParts)
|
||
|
} else {
|
||
|
msg = fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit",
|
||
|
MaxUploadParts)
|
||
|
}
|
||
|
return false, awserr.New("TotalPartsExceeded", msg, nil)
|
||
|
}
|
||
|
|
||
|
return true, err
|
||
|
}
|
||
|
|
||
|
// readChunk runs in worker goroutines to pull chunks off of the ch channel
|
||
|
// and send() them as UploadPart requests.
|
||
|
func (u *multiuploader) readChunk(ch chan chunk) {
|
||
|
defer u.wg.Done()
|
||
|
for {
|
||
|
data, ok := <-ch
|
||
|
|
||
|
if !ok {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
if u.geterr() == nil {
|
||
|
if err := u.send(data); err != nil {
|
||
|
u.seterr(err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
data.cleanup()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// send performs an UploadPart request and keeps track of the completed
|
||
|
// part information.
|
||
|
func (u *multiuploader) send(c chunk) error {
|
||
|
params := &s3.UploadPartInput{
|
||
|
Bucket: u.in.Bucket,
|
||
|
Key: u.in.Key,
|
||
|
Body: c.buf,
|
||
|
UploadId: &u.uploadID,
|
||
|
SSECustomerAlgorithm: u.in.SSECustomerAlgorithm,
|
||
|
SSECustomerKey: u.in.SSECustomerKey,
|
||
|
PartNumber: &c.num,
|
||
|
}
|
||
|
|
||
|
resp, err := u.cfg.S3.UploadPartWithContext(u.ctx, params, u.cfg.RequestOptions...)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
n := c.num
|
||
|
completed := &s3.CompletedPart{ETag: resp.ETag, PartNumber: &n}
|
||
|
|
||
|
u.m.Lock()
|
||
|
u.parts = append(u.parts, completed)
|
||
|
u.m.Unlock()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// geterr is a thread-safe getter for the error object
|
||
|
func (u *multiuploader) geterr() error {
|
||
|
u.m.Lock()
|
||
|
defer u.m.Unlock()
|
||
|
|
||
|
return u.err
|
||
|
}
|
||
|
|
||
|
// seterr is a thread-safe setter for the error object
|
||
|
func (u *multiuploader) seterr(e error) {
|
||
|
u.m.Lock()
|
||
|
defer u.m.Unlock()
|
||
|
|
||
|
u.err = e
|
||
|
}
|
||
|
|
||
|
// fail will abort the multipart unless LeavePartsOnError is set to true.
|
||
|
func (u *multiuploader) fail() {
|
||
|
if u.cfg.LeavePartsOnError {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
params := &s3.AbortMultipartUploadInput{
|
||
|
Bucket: u.in.Bucket,
|
||
|
Key: u.in.Key,
|
||
|
UploadId: &u.uploadID,
|
||
|
}
|
||
|
_, err := u.cfg.S3.AbortMultipartUploadWithContext(u.ctx, params, u.cfg.RequestOptions...)
|
||
|
if err != nil {
|
||
|
logMessage(u.cfg.S3, aws.LogDebug, fmt.Sprintf("failed to abort multipart upload, %v", err))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// complete successfully completes a multipart upload and returns the response.
|
||
|
func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput {
|
||
|
if u.geterr() != nil {
|
||
|
u.fail()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Parts must be sorted in PartNumber order.
|
||
|
sort.Sort(u.parts)
|
||
|
|
||
|
params := &s3.CompleteMultipartUploadInput{
|
||
|
Bucket: u.in.Bucket,
|
||
|
Key: u.in.Key,
|
||
|
UploadId: &u.uploadID,
|
||
|
MultipartUpload: &s3.CompletedMultipartUpload{Parts: u.parts},
|
||
|
}
|
||
|
resp, err := u.cfg.S3.CompleteMultipartUploadWithContext(u.ctx, params, u.cfg.RequestOptions...)
|
||
|
if err != nil {
|
||
|
u.seterr(err)
|
||
|
u.fail()
|
||
|
}
|
||
|
|
||
|
return resp
|
||
|
}
|
||
|
|
||
|
type readerAtSeeker interface {
|
||
|
io.ReaderAt
|
||
|
io.ReadSeeker
|
||
|
}
|