timeout_read_closer.go 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package request
  2. import (
  3. "io"
  4. "time"
  5. "github.com/aws/aws-sdk-go/aws/awserr"
  6. )
  7. var timeoutErr = awserr.New(
  8. ErrCodeResponseTimeout,
  9. "read on body has reached the timeout limit",
  10. nil,
  11. )
  12. type readResult struct {
  13. n int
  14. err error
  15. }
  16. // timeoutReadCloser will handle body reads that take too long.
  17. // We will return a ErrReadTimeout error if a timeout occurs.
  18. type timeoutReadCloser struct {
  19. reader io.ReadCloser
  20. duration time.Duration
  21. }
  22. // Read will spin off a goroutine to call the reader's Read method. We will
  23. // select on the timer's channel or the read's channel. Whoever completes first
  24. // will be returned.
  25. func (r *timeoutReadCloser) Read(b []byte) (int, error) {
  26. timer := time.NewTimer(r.duration)
  27. c := make(chan readResult, 1)
  28. go func() {
  29. n, err := r.reader.Read(b)
  30. timer.Stop()
  31. c <- readResult{n: n, err: err}
  32. }()
  33. select {
  34. case data := <-c:
  35. return data.n, data.err
  36. case <-timer.C:
  37. return 0, timeoutErr
  38. }
  39. }
  40. func (r *timeoutReadCloser) Close() error {
  41. return r.reader.Close()
  42. }
  43. const (
  44. // HandlerResponseTimeout is what we use to signify the name of the
  45. // response timeout handler.
  46. HandlerResponseTimeout = "ResponseTimeoutHandler"
  47. )
  48. // adaptToResponseTimeoutError is a handler that will replace any top level error
  49. // to a ErrCodeResponseTimeout, if its child is that.
  50. func adaptToResponseTimeoutError(req *Request) {
  51. if err, ok := req.Error.(awserr.Error); ok {
  52. aerr, ok := err.OrigErr().(awserr.Error)
  53. if ok && aerr.Code() == ErrCodeResponseTimeout {
  54. req.Error = aerr
  55. }
  56. }
  57. }
  58. // WithResponseReadTimeout is a request option that will wrap the body in a timeout read closer.
  59. // This will allow for per read timeouts. If a timeout occurred, we will return the
  60. // ErrCodeResponseTimeout.
  61. //
  62. // svc.PutObjectWithContext(ctx, params, request.WithTimeoutReadCloser(30 * time.Second)
  63. func WithResponseReadTimeout(duration time.Duration) Option {
  64. return func(r *Request) {
  65. var timeoutHandler = NamedHandler{
  66. HandlerResponseTimeout,
  67. func(req *Request) {
  68. req.HTTPResponse.Body = &timeoutReadCloser{
  69. reader: req.HTTPResponse.Body,
  70. duration: duration,
  71. }
  72. }}
  73. // remove the handler so we are not stomping over any new durations.
  74. r.Handlers.Send.RemoveByName(HandlerResponseTimeout)
  75. r.Handlers.Send.PushBackNamed(timeoutHandler)
  76. r.Handlers.Unmarshal.PushBack(adaptToResponseTimeoutError)
  77. r.Handlers.UnmarshalError.PushBack(adaptToResponseTimeoutError)
  78. }
  79. }