http_client.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package thrift
  20. import (
  21. "bytes"
  22. "io"
  23. "io/ioutil"
  24. "net/http"
  25. "net/url"
  26. "strconv"
  27. )
  28. // Default to using the shared http client. Library users are
  29. // free to change this global client or specify one through
  30. // THttpClientOptions.
  31. var DefaultHttpClient *http.Client = http.DefaultClient
  32. type THttpClient struct {
  33. client *http.Client
  34. response *http.Response
  35. url *url.URL
  36. requestBuffer *bytes.Buffer
  37. header http.Header
  38. nsecConnectTimeout int64
  39. nsecReadTimeout int64
  40. }
  41. type THttpClientTransportFactory struct {
  42. options THttpClientOptions
  43. url string
  44. isPost bool
  45. }
  46. func (p *THttpClientTransportFactory) GetTransport(trans TTransport) TTransport {
  47. if trans != nil {
  48. t, ok := trans.(*THttpClient)
  49. if ok && t.url != nil {
  50. if t.requestBuffer != nil {
  51. t2, _ := NewTHttpPostClientWithOptions(t.url.String(), p.options)
  52. return t2
  53. }
  54. t2, _ := NewTHttpClientWithOptions(t.url.String(), p.options)
  55. return t2
  56. }
  57. }
  58. if p.isPost {
  59. s, _ := NewTHttpPostClientWithOptions(p.url, p.options)
  60. return s
  61. }
  62. s, _ := NewTHttpClientWithOptions(p.url, p.options)
  63. return s
  64. }
  65. type THttpClientOptions struct {
  66. // If nil, DefaultHttpClient is used
  67. Client *http.Client
  68. }
  69. func NewTHttpClientTransportFactory(url string) *THttpClientTransportFactory {
  70. return NewTHttpClientTransportFactoryWithOptions(url, THttpClientOptions{})
  71. }
  72. func NewTHttpClientTransportFactoryWithOptions(url string, options THttpClientOptions) *THttpClientTransportFactory {
  73. return &THttpClientTransportFactory{url: url, isPost: false, options: options}
  74. }
  75. func NewTHttpPostClientTransportFactory(url string) *THttpClientTransportFactory {
  76. return NewTHttpPostClientTransportFactoryWithOptions(url, THttpClientOptions{})
  77. }
  78. func NewTHttpPostClientTransportFactoryWithOptions(url string, options THttpClientOptions) *THttpClientTransportFactory {
  79. return &THttpClientTransportFactory{url: url, isPost: true, options: options}
  80. }
  81. func NewTHttpClientWithOptions(urlstr string, options THttpClientOptions) (TTransport, error) {
  82. parsedURL, err := url.Parse(urlstr)
  83. if err != nil {
  84. return nil, err
  85. }
  86. response, err := http.Get(urlstr)
  87. if err != nil {
  88. return nil, err
  89. }
  90. client := options.Client
  91. if client == nil {
  92. client = DefaultHttpClient
  93. }
  94. httpHeader := map[string][]string{"Content-Type": []string{"application/x-thrift"}}
  95. return &THttpClient{client: client, response: response, url: parsedURL, header: httpHeader}, nil
  96. }
  97. func NewTHttpClient(urlstr string) (TTransport, error) {
  98. return NewTHttpClientWithOptions(urlstr, THttpClientOptions{})
  99. }
  100. func NewTHttpPostClientWithOptions(urlstr string, options THttpClientOptions) (TTransport, error) {
  101. parsedURL, err := url.Parse(urlstr)
  102. if err != nil {
  103. return nil, err
  104. }
  105. buf := make([]byte, 0, 1024)
  106. client := options.Client
  107. if client == nil {
  108. client = DefaultHttpClient
  109. }
  110. httpHeader := map[string][]string{"Content-Type": []string{"application/x-thrift"}}
  111. return &THttpClient{client: client, url: parsedURL, requestBuffer: bytes.NewBuffer(buf), header: httpHeader}, nil
  112. }
  113. func NewTHttpPostClient(urlstr string) (TTransport, error) {
  114. return NewTHttpPostClientWithOptions(urlstr, THttpClientOptions{})
  115. }
  116. // Set the HTTP Header for this specific Thrift Transport
  117. // It is important that you first assert the TTransport as a THttpClient type
  118. // like so:
  119. //
  120. // httpTrans := trans.(THttpClient)
  121. // httpTrans.SetHeader("User-Agent","Thrift Client 1.0")
  122. func (p *THttpClient) SetHeader(key string, value string) {
  123. p.header.Add(key, value)
  124. }
  125. // Get the HTTP Header represented by the supplied Header Key for this specific Thrift Transport
  126. // It is important that you first assert the TTransport as a THttpClient type
  127. // like so:
  128. //
  129. // httpTrans := trans.(THttpClient)
  130. // hdrValue := httpTrans.GetHeader("User-Agent")
  131. func (p *THttpClient) GetHeader(key string) string {
  132. return p.header.Get(key)
  133. }
  134. // Deletes the HTTP Header given a Header Key for this specific Thrift Transport
  135. // It is important that you first assert the TTransport as a THttpClient type
  136. // like so:
  137. //
  138. // httpTrans := trans.(THttpClient)
  139. // httpTrans.DelHeader("User-Agent")
  140. func (p *THttpClient) DelHeader(key string) {
  141. p.header.Del(key)
  142. }
  143. func (p *THttpClient) Open() error {
  144. // do nothing
  145. return nil
  146. }
  147. func (p *THttpClient) IsOpen() bool {
  148. return p.response != nil || p.requestBuffer != nil
  149. }
  150. func (p *THttpClient) closeResponse() error {
  151. var err error
  152. if p.response != nil && p.response.Body != nil {
  153. // The docs specify that if keepalive is enabled and the response body is not
  154. // read to completion the connection will never be returned to the pool and
  155. // reused. Errors are being ignored here because if the connection is invalid
  156. // and this fails for some reason, the Close() method will do any remaining
  157. // cleanup.
  158. io.Copy(ioutil.Discard, p.response.Body)
  159. err = p.response.Body.Close()
  160. }
  161. p.response = nil
  162. return err
  163. }
  164. func (p *THttpClient) Close() error {
  165. if p.requestBuffer != nil {
  166. p.requestBuffer.Reset()
  167. p.requestBuffer = nil
  168. }
  169. return p.closeResponse()
  170. }
  171. func (p *THttpClient) Read(buf []byte) (int, error) {
  172. if p.response == nil {
  173. return 0, NewTTransportException(NOT_OPEN, "Response buffer is empty, no request.")
  174. }
  175. n, err := p.response.Body.Read(buf)
  176. if n > 0 && (err == nil || err == io.EOF) {
  177. return n, nil
  178. }
  179. return n, NewTTransportExceptionFromError(err)
  180. }
  181. func (p *THttpClient) ReadByte() (c byte, err error) {
  182. return readByte(p.response.Body)
  183. }
  184. func (p *THttpClient) Write(buf []byte) (int, error) {
  185. n, err := p.requestBuffer.Write(buf)
  186. return n, err
  187. }
  188. func (p *THttpClient) WriteByte(c byte) error {
  189. return p.requestBuffer.WriteByte(c)
  190. }
  191. func (p *THttpClient) WriteString(s string) (n int, err error) {
  192. return p.requestBuffer.WriteString(s)
  193. }
  194. func (p *THttpClient) Flush() error {
  195. // Close any previous response body to avoid leaking connections.
  196. p.closeResponse()
  197. req, err := http.NewRequest("POST", p.url.String(), p.requestBuffer)
  198. if err != nil {
  199. return NewTTransportExceptionFromError(err)
  200. }
  201. req.Header = p.header
  202. response, err := p.client.Do(req)
  203. if err != nil {
  204. return NewTTransportExceptionFromError(err)
  205. }
  206. if response.StatusCode != http.StatusOK {
  207. // Close the response to avoid leaking file descriptors. closeResponse does
  208. // more than just call Close(), so temporarily assign it and reuse the logic.
  209. p.response = response
  210. p.closeResponse()
  211. // TODO(pomack) log bad response
  212. return NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, "HTTP Response code: "+strconv.Itoa(response.StatusCode))
  213. }
  214. p.response = response
  215. return nil
  216. }
  217. func (p *THttpClient) RemainingBytes() (num_bytes uint64) {
  218. len := p.response.ContentLength
  219. if len >= 0 {
  220. return uint64(len)
  221. }
  222. const maxSize = ^uint64(0)
  223. return maxSize // the thruth is, we just don't know unless framed is used
  224. }