copy.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright 2016 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package storage
  15. import (
  16. "errors"
  17. "fmt"
  18. "golang.org/x/net/context"
  19. raw "google.golang.org/api/storage/v1"
  20. )
  21. // CopierFrom creates a Copier that can copy src to dst.
  22. // You can immediately call Run on the returned Copier, or
  23. // you can configure it first.
  24. //
  25. // For Requester Pays buckets, the user project of dst is billed, unless it is empty,
  26. // in which case the user project of src is billed.
  27. func (dst *ObjectHandle) CopierFrom(src *ObjectHandle) *Copier {
  28. return &Copier{dst: dst, src: src}
  29. }
  30. // A Copier copies a source object to a destination.
  31. type Copier struct {
  32. // ObjectAttrs are optional attributes to set on the destination object.
  33. // Any attributes must be initialized before any calls on the Copier. Nil
  34. // or zero-valued attributes are ignored.
  35. ObjectAttrs
  36. // RewriteToken can be set before calling Run to resume a copy
  37. // operation. After Run returns a non-nil error, RewriteToken will
  38. // have been updated to contain the value needed to resume the copy.
  39. RewriteToken string
  40. // ProgressFunc can be used to monitor the progress of a multi-RPC copy
  41. // operation. If ProgressFunc is not nil and copying requires multiple
  42. // calls to the underlying service (see
  43. // https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite), then
  44. // ProgressFunc will be invoked after each call with the number of bytes of
  45. // content copied so far and the total size in bytes of the source object.
  46. //
  47. // ProgressFunc is intended to make upload progress available to the
  48. // application. For example, the implementation of ProgressFunc may update
  49. // a progress bar in the application's UI, or log the result of
  50. // float64(copiedBytes)/float64(totalBytes).
  51. //
  52. // ProgressFunc should return quickly without blocking.
  53. ProgressFunc func(copiedBytes, totalBytes uint64)
  54. dst, src *ObjectHandle
  55. }
  56. // Run performs the copy.
  57. func (c *Copier) Run(ctx context.Context) (*ObjectAttrs, error) {
  58. if err := c.src.validate(); err != nil {
  59. return nil, err
  60. }
  61. if err := c.dst.validate(); err != nil {
  62. return nil, err
  63. }
  64. // Convert destination attributes to raw form, omitting the bucket.
  65. // If the bucket is included but name or content-type aren't, the service
  66. // returns a 400 with "Required" as the only message. Omitting the bucket
  67. // does not cause any problems.
  68. rawObject := c.ObjectAttrs.toRawObject("")
  69. for {
  70. res, err := c.callRewrite(ctx, rawObject)
  71. if err != nil {
  72. return nil, err
  73. }
  74. if c.ProgressFunc != nil {
  75. c.ProgressFunc(uint64(res.TotalBytesRewritten), uint64(res.ObjectSize))
  76. }
  77. if res.Done { // Finished successfully.
  78. return newObject(res.Resource), nil
  79. }
  80. }
  81. }
  82. func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.RewriteResponse, error) {
  83. call := c.dst.c.raw.Objects.Rewrite(c.src.bucket, c.src.object, c.dst.bucket, c.dst.object, rawObj)
  84. call.Context(ctx).Projection("full")
  85. if c.RewriteToken != "" {
  86. call.RewriteToken(c.RewriteToken)
  87. }
  88. if err := applyConds("Copy destination", c.dst.gen, c.dst.conds, call); err != nil {
  89. return nil, err
  90. }
  91. if c.dst.userProject != "" {
  92. call.UserProject(c.dst.userProject)
  93. } else if c.src.userProject != "" {
  94. call.UserProject(c.src.userProject)
  95. }
  96. if err := applySourceConds(c.src.gen, c.src.conds, call); err != nil {
  97. return nil, err
  98. }
  99. if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil {
  100. return nil, err
  101. }
  102. if err := setEncryptionHeaders(call.Header(), c.src.encryptionKey, true); err != nil {
  103. return nil, err
  104. }
  105. var res *raw.RewriteResponse
  106. var err error
  107. setClientHeader(call.Header())
  108. err = runWithRetry(ctx, func() error { res, err = call.Do(); return err })
  109. if err != nil {
  110. return nil, err
  111. }
  112. c.RewriteToken = res.RewriteToken
  113. return res, nil
  114. }
  115. // ComposerFrom creates a Composer that can compose srcs into dst.
  116. // You can immediately call Run on the returned Composer, or you can
  117. // configure it first.
  118. //
  119. // The encryption key for the destination object will be used to decrypt all
  120. // source objects and encrypt the destination object. It is an error
  121. // to specify an encryption key for any of the source objects.
  122. func (dst *ObjectHandle) ComposerFrom(srcs ...*ObjectHandle) *Composer {
  123. return &Composer{dst: dst, srcs: srcs}
  124. }
  125. // A Composer composes source objects into a destination object.
  126. //
  127. // For Requester Pays buckets, the user project of dst is billed.
  128. type Composer struct {
  129. // ObjectAttrs are optional attributes to set on the destination object.
  130. // Any attributes must be initialized before any calls on the Composer. Nil
  131. // or zero-valued attributes are ignored.
  132. ObjectAttrs
  133. dst *ObjectHandle
  134. srcs []*ObjectHandle
  135. }
  136. // Run performs the compose operation.
  137. func (c *Composer) Run(ctx context.Context) (*ObjectAttrs, error) {
  138. if err := c.dst.validate(); err != nil {
  139. return nil, err
  140. }
  141. if len(c.srcs) == 0 {
  142. return nil, errors.New("storage: at least one source object must be specified")
  143. }
  144. req := &raw.ComposeRequest{}
  145. // Compose requires a non-empty Destination, so we always set it,
  146. // even if the caller-provided ObjectAttrs is the zero value.
  147. req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket)
  148. for _, src := range c.srcs {
  149. if err := src.validate(); err != nil {
  150. return nil, err
  151. }
  152. if src.bucket != c.dst.bucket {
  153. return nil, fmt.Errorf("storage: all source objects must be in bucket %q, found %q", c.dst.bucket, src.bucket)
  154. }
  155. if src.encryptionKey != nil {
  156. return nil, fmt.Errorf("storage: compose source %s.%s must not have encryption key", src.bucket, src.object)
  157. }
  158. srcObj := &raw.ComposeRequestSourceObjects{
  159. Name: src.object,
  160. }
  161. if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil {
  162. return nil, err
  163. }
  164. req.SourceObjects = append(req.SourceObjects, srcObj)
  165. }
  166. call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx)
  167. if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil {
  168. return nil, err
  169. }
  170. if c.dst.userProject != "" {
  171. call.UserProject(c.dst.userProject)
  172. }
  173. if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil {
  174. return nil, err
  175. }
  176. var obj *raw.Object
  177. var err error
  178. setClientHeader(call.Header())
  179. err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err })
  180. if err != nil {
  181. return nil, err
  182. }
  183. return newObject(obj), nil
  184. }