upload.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. package s3manager
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/aws/aws-sdk-go/aws"
  10. "github.com/aws/aws-sdk-go/aws/awserr"
  11. "github.com/aws/aws-sdk-go/aws/awsutil"
  12. "github.com/aws/aws-sdk-go/aws/client"
  13. "github.com/aws/aws-sdk-go/aws/request"
  14. "github.com/aws/aws-sdk-go/service/s3"
  15. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  16. )
  17. // MaxUploadParts is the maximum allowed number of parts in a multi-part upload
  18. // on Amazon S3.
  19. const MaxUploadParts = 10000
  20. // MinUploadPartSize is the minimum allowed part size when uploading a part to
  21. // Amazon S3.
  22. const MinUploadPartSize int64 = 1024 * 1024 * 5
  23. // DefaultUploadPartSize is the default part size to buffer chunks of a
  24. // payload into.
  25. const DefaultUploadPartSize = MinUploadPartSize
  26. // DefaultUploadConcurrency is the default number of goroutines to spin up when
  27. // using Upload().
  28. const DefaultUploadConcurrency = 5
  29. // A MultiUploadFailure wraps a failed S3 multipart upload. An error returned
  30. // will satisfy this interface when a multi part upload failed to upload all
  31. // chucks to S3. In the case of a failure the UploadID is needed to operate on
  32. // the chunks, if any, which were uploaded.
  33. //
  34. // Example:
  35. //
  36. // u := s3manager.NewUploader(opts)
  37. // output, err := u.upload(input)
  38. // if err != nil {
  39. // if multierr, ok := err.(s3manager.MultiUploadFailure); ok {
  40. // // Process error and its associated uploadID
  41. // fmt.Println("Error:", multierr.Code(), multierr.Message(), multierr.UploadID())
  42. // } else {
  43. // // Process error generically
  44. // fmt.Println("Error:", err.Error())
  45. // }
  46. // }
  47. //
  48. type MultiUploadFailure interface {
  49. awserr.Error
  50. // Returns the upload id for the S3 multipart upload that failed.
  51. UploadID() string
  52. }
  53. // So that the Error interface type can be included as an anonymous field
  54. // in the multiUploadError struct and not conflict with the error.Error() method.
  55. type awsError awserr.Error
  56. // A multiUploadError wraps the upload ID of a failed s3 multipart upload.
  57. // Composed of BaseError for code, message, and original error
  58. //
  59. // Should be used for an error that occurred failing a S3 multipart upload,
  60. // and a upload ID is available. If an uploadID is not available a more relevant
  61. type multiUploadError struct {
  62. awsError
  63. // ID for multipart upload which failed.
  64. uploadID string
  65. }
  66. // Error returns the string representation of the error.
  67. //
  68. // See apierr.BaseError ErrorWithExtra for output format
  69. //
  70. // Satisfies the error interface.
  71. func (m multiUploadError) Error() string {
  72. extra := fmt.Sprintf("upload id: %s", m.uploadID)
  73. return awserr.SprintError(m.Code(), m.Message(), extra, m.OrigErr())
  74. }
  75. // String returns the string representation of the error.
  76. // Alias for Error to satisfy the stringer interface.
  77. func (m multiUploadError) String() string {
  78. return m.Error()
  79. }
  80. // UploadID returns the id of the S3 upload which failed.
  81. func (m multiUploadError) UploadID() string {
  82. return m.uploadID
  83. }
  84. // UploadInput contains all input for upload requests to Amazon S3.
  85. type UploadInput struct {
  86. // The canned ACL to apply to the object.
  87. ACL *string `location:"header" locationName:"x-amz-acl" type:"string"`
  88. Bucket *string `location:"uri" locationName:"Bucket" type:"string" required:"true"`
  89. // Specifies caching behavior along the request/reply chain.
  90. CacheControl *string `location:"header" locationName:"Cache-Control" type:"string"`
  91. // Specifies presentational information for the object.
  92. ContentDisposition *string `location:"header" locationName:"Content-Disposition" type:"string"`
  93. // Specifies what content encodings have been applied to the object and thus
  94. // what decoding mechanisms must be applied to obtain the media-type referenced
  95. // by the Content-Type header field.
  96. ContentEncoding *string `location:"header" locationName:"Content-Encoding" type:"string"`
  97. // The language the content is in.
  98. ContentLanguage *string `location:"header" locationName:"Content-Language" type:"string"`
  99. // A standard MIME type describing the format of the object data.
  100. ContentType *string `location:"header" locationName:"Content-Type" type:"string"`
  101. // The date and time at which the object is no longer cacheable.
  102. Expires *time.Time `location:"header" locationName:"Expires" type:"timestamp" timestampFormat:"rfc822"`
  103. // Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object.
  104. GrantFullControl *string `location:"header" locationName:"x-amz-grant-full-control" type:"string"`
  105. // Allows grantee to read the object data and its metadata.
  106. GrantRead *string `location:"header" locationName:"x-amz-grant-read" type:"string"`
  107. // Allows grantee to read the object ACL.
  108. GrantReadACP *string `location:"header" locationName:"x-amz-grant-read-acp" type:"string"`
  109. // Allows grantee to write the ACL for the applicable object.
  110. GrantWriteACP *string `location:"header" locationName:"x-amz-grant-write-acp" type:"string"`
  111. Key *string `location:"uri" locationName:"Key" type:"string" required:"true"`
  112. // A map of metadata to store with the object in S3.
  113. Metadata map[string]*string `location:"headers" locationName:"x-amz-meta-" type:"map"`
  114. // Confirms that the requester knows that she or he will be charged for the
  115. // request. Bucket owners need not specify this parameter in their requests.
  116. // Documentation on downloading objects from requester pays buckets can be found
  117. // at http://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectsinRequesterPaysBuckets.html
  118. RequestPayer *string `location:"header" locationName:"x-amz-request-payer" type:"string"`
  119. // Specifies the algorithm to use to when encrypting the object (e.g., AES256,
  120. // aws:kms).
  121. SSECustomerAlgorithm *string `location:"header" locationName:"x-amz-server-side-encryption-customer-algorithm" type:"string"`
  122. // Specifies the customer-provided encryption key for Amazon S3 to use in encrypting
  123. // data. This value is used to store the object and then it is discarded; Amazon
  124. // does not store the encryption key. The key must be appropriate for use with
  125. // the algorithm specified in the x-amz-server-side​-encryption​-customer-algorithm
  126. // header.
  127. SSECustomerKey *string `location:"header" locationName:"x-amz-server-side-encryption-customer-key" type:"string"`
  128. // Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321.
  129. // Amazon S3 uses this header for a message integrity check to ensure the encryption
  130. // key was transmitted without error.
  131. SSECustomerKeyMD5 *string `location:"header" locationName:"x-amz-server-side-encryption-customer-key-MD5" type:"string"`
  132. // Specifies the AWS KMS key ID to use for object encryption. All GET and PUT
  133. // requests for an object protected by AWS KMS will fail if not made via SSL
  134. // or using SigV4. Documentation on configuring any of the officially supported
  135. // AWS SDKs and CLI can be found at http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version
  136. SSEKMSKeyId *string `location:"header" locationName:"x-amz-server-side-encryption-aws-kms-key-id" type:"string"`
  137. // The Server-side encryption algorithm used when storing this object in S3
  138. // (e.g., AES256, aws:kms).
  139. ServerSideEncryption *string `location:"header" locationName:"x-amz-server-side-encryption" type:"string"`
  140. // The type of storage to use for the object. Defaults to 'STANDARD'.
  141. StorageClass *string `location:"header" locationName:"x-amz-storage-class" type:"string"`
  142. // If the bucket is configured as a website, redirects requests for this object
  143. // to another object in the same bucket or to an external URL. Amazon S3 stores
  144. // the value of this header in the object metadata.
  145. WebsiteRedirectLocation *string `location:"header" locationName:"x-amz-website-redirect-location" type:"string"`
  146. // The readable body payload to send to S3.
  147. Body io.Reader
  148. }
  149. // UploadOutput represents a response from the Upload() call.
  150. type UploadOutput struct {
  151. // The URL where the object was uploaded to.
  152. Location string
  153. // The version of the object that was uploaded. Will only be populated if
  154. // the S3 Bucket is versioned. If the bucket is not versioned this field
  155. // will not be set.
  156. VersionID *string
  157. // The ID for a multipart upload to S3. In the case of an error the error
  158. // can be cast to the MultiUploadFailure interface to extract the upload ID.
  159. UploadID string
  160. }
  161. // The Uploader structure that calls Upload(). It is safe to call Upload()
  162. // on this structure for multiple objects and across concurrent goroutines.
  163. // Mutating the Uploader's properties is not safe to be done concurrently.
  164. type Uploader struct {
  165. // The buffer size (in bytes) to use when buffering data into chunks and
  166. // sending them as parts to S3. The minimum allowed part size is 5MB, and
  167. // if this value is set to zero, the DefaultPartSize value will be used.
  168. PartSize int64
  169. // The number of goroutines to spin up in parallel when sending parts.
  170. // If this is set to zero, the DefaultUploadConcurrency value will be used.
  171. Concurrency int
  172. // Setting this value to true will cause the SDK to avoid calling
  173. // AbortMultipartUpload on a failure, leaving all successfully uploaded
  174. // parts on S3 for manual recovery.
  175. //
  176. // Note that storing parts of an incomplete multipart upload counts towards
  177. // space usage on S3 and will add additional costs if not cleaned up.
  178. LeavePartsOnError bool
  179. // MaxUploadParts is the max number of parts which will be uploaded to S3.
  180. // Will be used to calculate the partsize of the object to be uploaded.
  181. // E.g: 5GB file, with MaxUploadParts set to 100, will upload the file
  182. // as 100, 50MB parts.
  183. // With a limited of s3.MaxUploadParts (10,000 parts).
  184. MaxUploadParts int
  185. // The client to use when uploading to S3.
  186. S3 s3iface.S3API
  187. }
  188. // NewUploader creates a new Uploader instance to upload objects to S3. Pass In
  189. // additional functional options to customize the uploader's behavior. Requires a
  190. // client.ConfigProvider in order to create a S3 service client. The session.Session
  191. // satisfies the client.ConfigProvider interface.
  192. //
  193. // Example:
  194. // // The session the S3 Uploader will use
  195. // sess, err := session.NewSession()
  196. //
  197. // // Create an uploader with the session and default options
  198. // uploader := s3manager.NewUploader(sess)
  199. //
  200. // // Create an uploader with the session and custom options
  201. // uploader := s3manager.NewUploader(session, func(u *s3manager.Uploader) {
  202. // u.PartSize = 64 * 1024 * 1024 // 64MB per part
  203. // })
  204. func NewUploader(c client.ConfigProvider, options ...func(*Uploader)) *Uploader {
  205. u := &Uploader{
  206. S3: s3.New(c),
  207. PartSize: DefaultUploadPartSize,
  208. Concurrency: DefaultUploadConcurrency,
  209. LeavePartsOnError: false,
  210. MaxUploadParts: MaxUploadParts,
  211. }
  212. for _, option := range options {
  213. option(u)
  214. }
  215. return u
  216. }
  217. // NewUploaderWithClient creates a new Uploader instance to upload objects to S3. Pass in
  218. // additional functional options to customize the uploader's behavior. Requires
  219. // a S3 service client to make S3 API calls.
  220. //
  221. // Example:
  222. // // The session the S3 Uploader will use
  223. // sess, err := session.NewSession()
  224. //
  225. // // S3 service client the Upload manager will use.
  226. // s3Svc := s3.New(sess)
  227. //
  228. // // Create an uploader with S3 client and default options
  229. // uploader := s3manager.NewUploaderWithClient(s3Svc)
  230. //
  231. // // Create an uploader with S3 client and custom options
  232. // uploader := s3manager.NewUploaderWithClient(s3Svc, func(u *s3manager.Uploader) {
  233. // u.PartSize = 64 * 1024 * 1024 // 64MB per part
  234. // })
  235. func NewUploaderWithClient(svc s3iface.S3API, options ...func(*Uploader)) *Uploader {
  236. u := &Uploader{
  237. S3: svc,
  238. PartSize: DefaultUploadPartSize,
  239. Concurrency: DefaultUploadConcurrency,
  240. LeavePartsOnError: false,
  241. MaxUploadParts: MaxUploadParts,
  242. }
  243. for _, option := range options {
  244. option(u)
  245. }
  246. return u
  247. }
  248. // Upload uploads an object to S3, intelligently buffering large files into
  249. // smaller chunks and sending them in parallel across multiple goroutines. You
  250. // can configure the buffer size and concurrency through the Uploader's parameters.
  251. //
  252. // Additional functional options can be provided to configure the individual
  253. // upload. These options are copies of the Uploader instance Upload is called from.
  254. // Modifying the options will not impact the original Uploader instance.
  255. //
  256. // It is safe to call this method concurrently across goroutines.
  257. //
  258. // Example:
  259. // // Upload input parameters
  260. // upParams := &s3manager.UploadInput{
  261. // Bucket: &bucketName,
  262. // Key: &keyName,
  263. // Body: file,
  264. // }
  265. //
  266. // // Perform an upload.
  267. // result, err := uploader.Upload(upParams)
  268. //
  269. // // Perform upload with options different than the those in the Uploader.
  270. // result, err := uploader.Upload(upParams, func(u *s3manager.Uploader) {
  271. // u.PartSize = 10 * 1024 * 1024 // 10MB part size
  272. // u.LeavePartsOnError = true // Don't delete the parts if the upload fails.
  273. // })
  274. func (u Uploader) Upload(input *UploadInput, options ...func(*Uploader)) (*UploadOutput, error) {
  275. i := uploader{in: input, ctx: u}
  276. for _, option := range options {
  277. option(&i.ctx)
  278. }
  279. return i.upload()
  280. }
  281. // internal structure to manage an upload to S3.
  282. type uploader struct {
  283. ctx Uploader
  284. in *UploadInput
  285. readerPos int64 // current reader position
  286. totalSize int64 // set to -1 if the size is not known
  287. }
  288. // internal logic for deciding whether to upload a single part or use a
  289. // multipart upload.
  290. func (u *uploader) upload() (*UploadOutput, error) {
  291. u.init()
  292. if u.ctx.PartSize < MinUploadPartSize {
  293. msg := fmt.Sprintf("part size must be at least %d bytes", MinUploadPartSize)
  294. return nil, awserr.New("ConfigError", msg, nil)
  295. }
  296. // Do one read to determine if we have more than one part
  297. buf, err := u.nextReader()
  298. if err == io.EOF || err == io.ErrUnexpectedEOF { // single part
  299. return u.singlePart(buf)
  300. } else if err != nil {
  301. return nil, awserr.New("ReadRequestBody", "read upload data failed", err)
  302. }
  303. mu := multiuploader{uploader: u}
  304. return mu.upload(buf)
  305. }
  306. // init will initialize all default options.
  307. func (u *uploader) init() {
  308. if u.ctx.Concurrency == 0 {
  309. u.ctx.Concurrency = DefaultUploadConcurrency
  310. }
  311. if u.ctx.PartSize == 0 {
  312. u.ctx.PartSize = DefaultUploadPartSize
  313. }
  314. // Try to get the total size for some optimizations
  315. u.initSize()
  316. }
  317. // initSize tries to detect the total stream size, setting u.totalSize. If
  318. // the size is not known, totalSize is set to -1.
  319. func (u *uploader) initSize() {
  320. u.totalSize = -1
  321. switch r := u.in.Body.(type) {
  322. case io.Seeker:
  323. pos, _ := r.Seek(0, 1)
  324. defer r.Seek(pos, 0)
  325. n, err := r.Seek(0, 2)
  326. if err != nil {
  327. return
  328. }
  329. u.totalSize = n
  330. // Try to adjust partSize if it is too small and account for
  331. // integer division truncation.
  332. if u.totalSize/u.ctx.PartSize >= int64(u.ctx.MaxUploadParts) {
  333. // Add one to the part size to account for remainders
  334. // during the size calculation. e.g odd number of bytes.
  335. u.ctx.PartSize = (u.totalSize / int64(u.ctx.MaxUploadParts)) + 1
  336. }
  337. }
  338. }
  339. // nextReader returns a seekable reader representing the next packet of data.
  340. // This operation increases the shared u.readerPos counter, but note that it
  341. // does not need to be wrapped in a mutex because nextReader is only called
  342. // from the main thread.
  343. func (u *uploader) nextReader() (io.ReadSeeker, error) {
  344. switch r := u.in.Body.(type) {
  345. case io.ReaderAt:
  346. var err error
  347. n := u.ctx.PartSize
  348. if u.totalSize >= 0 {
  349. bytesLeft := u.totalSize - u.readerPos
  350. if bytesLeft == 0 {
  351. err = io.EOF
  352. n = bytesLeft
  353. } else if bytesLeft <= u.ctx.PartSize {
  354. err = io.ErrUnexpectedEOF
  355. n = bytesLeft
  356. }
  357. }
  358. buf := io.NewSectionReader(r, u.readerPos, n)
  359. u.readerPos += n
  360. return buf, err
  361. default:
  362. packet := make([]byte, u.ctx.PartSize)
  363. n, err := io.ReadFull(u.in.Body, packet)
  364. u.readerPos += int64(n)
  365. return bytes.NewReader(packet[0:n]), err
  366. }
  367. }
  368. // singlePart contains upload logic for uploading a single chunk via
  369. // a regular PutObject request. Multipart requests require at least two
  370. // parts, or at least 5MB of data.
  371. func (u *uploader) singlePart(buf io.ReadSeeker) (*UploadOutput, error) {
  372. params := &s3.PutObjectInput{}
  373. awsutil.Copy(params, u.in)
  374. params.Body = buf
  375. req, out := u.ctx.S3.PutObjectRequest(params)
  376. req.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler("S3Manager"))
  377. if err := req.Send(); err != nil {
  378. return nil, err
  379. }
  380. url := req.HTTPRequest.URL.String()
  381. return &UploadOutput{
  382. Location: url,
  383. VersionID: out.VersionId,
  384. }, nil
  385. }
  386. // internal structure to manage a specific multipart upload to S3.
  387. type multiuploader struct {
  388. *uploader
  389. wg sync.WaitGroup
  390. m sync.Mutex
  391. err error
  392. uploadID string
  393. parts completedParts
  394. }
  395. // keeps track of a single chunk of data being sent to S3.
  396. type chunk struct {
  397. buf io.ReadSeeker
  398. num int64
  399. }
  400. // completedParts is a wrapper to make parts sortable by their part number,
  401. // since S3 required this list to be sent in sorted order.
  402. type completedParts []*s3.CompletedPart
  403. func (a completedParts) Len() int { return len(a) }
  404. func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  405. func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
  406. // upload will perform a multipart upload using the firstBuf buffer containing
  407. // the first chunk of data.
  408. func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) {
  409. params := &s3.CreateMultipartUploadInput{}
  410. awsutil.Copy(params, u.in)
  411. // Create the multipart
  412. req, resp := u.ctx.S3.CreateMultipartUploadRequest(params)
  413. req.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler("S3Manager"))
  414. if err := req.Send(); err != nil {
  415. return nil, err
  416. }
  417. u.uploadID = *resp.UploadId
  418. // Create the workers
  419. ch := make(chan chunk, u.ctx.Concurrency)
  420. for i := 0; i < u.ctx.Concurrency; i++ {
  421. u.wg.Add(1)
  422. go u.readChunk(ch)
  423. }
  424. // Send part 1 to the workers
  425. var num int64 = 1
  426. ch <- chunk{buf: firstBuf, num: num}
  427. // Read and queue the rest of the parts
  428. for u.geterr() == nil {
  429. num++
  430. // This upload exceeded maximum number of supported parts, error now.
  431. if num > int64(u.ctx.MaxUploadParts) || num > int64(MaxUploadParts) {
  432. var msg string
  433. if num > int64(u.ctx.MaxUploadParts) {
  434. msg = fmt.Sprintf("exceeded total allowed configured MaxUploadParts (%d). Adjust PartSize to fit in this limit",
  435. u.ctx.MaxUploadParts)
  436. } else {
  437. msg = fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit",
  438. MaxUploadParts)
  439. }
  440. u.seterr(awserr.New("TotalPartsExceeded", msg, nil))
  441. break
  442. }
  443. buf, err := u.nextReader()
  444. if err == io.EOF {
  445. break
  446. }
  447. ch <- chunk{buf: buf, num: num}
  448. if err == io.ErrUnexpectedEOF {
  449. break
  450. } else if err != nil {
  451. u.seterr(awserr.New(
  452. "ReadRequestBody",
  453. "read multipart upload data failed",
  454. err))
  455. break
  456. }
  457. }
  458. // Close the channel, wait for workers, and complete upload
  459. close(ch)
  460. u.wg.Wait()
  461. complete := u.complete()
  462. if err := u.geterr(); err != nil {
  463. return nil, &multiUploadError{
  464. awsError: awserr.New(
  465. "MultipartUpload",
  466. "upload multipart failed",
  467. err),
  468. uploadID: u.uploadID,
  469. }
  470. }
  471. return &UploadOutput{
  472. Location: aws.StringValue(complete.Location),
  473. VersionID: complete.VersionId,
  474. UploadID: u.uploadID,
  475. }, nil
  476. }
  477. // readChunk runs in worker goroutines to pull chunks off of the ch channel
  478. // and send() them as UploadPart requests.
  479. func (u *multiuploader) readChunk(ch chan chunk) {
  480. defer u.wg.Done()
  481. for {
  482. data, ok := <-ch
  483. if !ok {
  484. break
  485. }
  486. if u.geterr() == nil {
  487. if err := u.send(data); err != nil {
  488. u.seterr(err)
  489. }
  490. }
  491. }
  492. }
  493. // send performs an UploadPart request and keeps track of the completed
  494. // part information.
  495. func (u *multiuploader) send(c chunk) error {
  496. req, resp := u.ctx.S3.UploadPartRequest(&s3.UploadPartInput{
  497. Bucket: u.in.Bucket,
  498. Key: u.in.Key,
  499. Body: c.buf,
  500. UploadId: &u.uploadID,
  501. PartNumber: &c.num,
  502. })
  503. req.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler("S3Manager"))
  504. if err := req.Send(); err != nil {
  505. return err
  506. }
  507. n := c.num
  508. completed := &s3.CompletedPart{ETag: resp.ETag, PartNumber: &n}
  509. u.m.Lock()
  510. u.parts = append(u.parts, completed)
  511. u.m.Unlock()
  512. return nil
  513. }
  514. // geterr is a thread-safe getter for the error object
  515. func (u *multiuploader) geterr() error {
  516. u.m.Lock()
  517. defer u.m.Unlock()
  518. return u.err
  519. }
  520. // seterr is a thread-safe setter for the error object
  521. func (u *multiuploader) seterr(e error) {
  522. u.m.Lock()
  523. defer u.m.Unlock()
  524. u.err = e
  525. }
  526. // fail will abort the multipart unless LeavePartsOnError is set to true.
  527. func (u *multiuploader) fail() {
  528. if u.ctx.LeavePartsOnError {
  529. return
  530. }
  531. req, _ := u.ctx.S3.AbortMultipartUploadRequest(&s3.AbortMultipartUploadInput{
  532. Bucket: u.in.Bucket,
  533. Key: u.in.Key,
  534. UploadId: &u.uploadID,
  535. })
  536. req.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler("S3Manager"))
  537. req.Send()
  538. }
  539. // complete successfully completes a multipart upload and returns the response.
  540. func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput {
  541. if u.geterr() != nil {
  542. u.fail()
  543. return nil
  544. }
  545. // Parts must be sorted in PartNumber order.
  546. sort.Sort(u.parts)
  547. req, resp := u.ctx.S3.CompleteMultipartUploadRequest(&s3.CompleteMultipartUploadInput{
  548. Bucket: u.in.Bucket,
  549. Key: u.in.Key,
  550. UploadId: &u.uploadID,
  551. MultipartUpload: &s3.CompletedMultipartUpload{Parts: u.parts},
  552. })
  553. req.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler("S3Manager"))
  554. if err := req.Send(); err != nil {
  555. u.seterr(err)
  556. u.fail()
  557. }
  558. return resp
  559. }