propagation.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Copyright (c) 2016 Uber Technologies, Inc.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a copy
  4. // of this software and associated documentation files (the "Software"), to deal
  5. // in the Software without restriction, including without limitation the rights
  6. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. // copies of the Software, and to permit persons to whom the Software is
  8. // furnished to do so, subject to the following conditions:
  9. //
  10. // The above copyright notice and this permission notice shall be included in
  11. // all copies or substantial portions of the Software.
  12. //
  13. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19. // THE SOFTWARE.
  20. package jaeger
  21. import (
  22. "bytes"
  23. "encoding/binary"
  24. "fmt"
  25. "io"
  26. "log"
  27. "net/url"
  28. "strings"
  29. "sync"
  30. opentracing "github.com/opentracing/opentracing-go"
  31. )
  32. // Injector is responsible for injecting SpanContext instances in a manner suitable
  33. // for propagation via a format-specific "carrier" object. Typically the
  34. // injection will take place across an RPC boundary, but message queues and
  35. // other IPC mechanisms are also reasonable places to use an Injector.
  36. type Injector interface {
  37. // Inject takes `SpanContext` and injects it into `carrier`. The actual type
  38. // of `carrier` depends on the `format` passed to `Tracer.Inject()`.
  39. //
  40. // Implementations may return opentracing.ErrInvalidCarrier or any other
  41. // implementation-specific error if injection fails.
  42. Inject(ctx SpanContext, carrier interface{}) error
  43. }
  44. // Extractor is responsible for extracting SpanContext instances from a
  45. // format-specific "carrier" object. Typically the extraction will take place
  46. // on the server side of an RPC boundary, but message queues and other IPC
  47. // mechanisms are also reasonable places to use an Extractor.
  48. type Extractor interface {
  49. // Extract decodes a SpanContext instance from the given `carrier`,
  50. // or (nil, opentracing.ErrSpanContextNotFound) if no context could
  51. // be found in the `carrier`.
  52. Extract(carrier interface{}) (SpanContext, error)
  53. }
  54. type textMapPropagator struct {
  55. headerKeys *HeadersConfig
  56. metrics Metrics
  57. encodeValue func(string) string
  58. decodeValue func(string) string
  59. }
  60. func newTextMapPropagator(headerKeys *HeadersConfig, metrics Metrics) *textMapPropagator {
  61. return &textMapPropagator{
  62. headerKeys: headerKeys,
  63. metrics: metrics,
  64. encodeValue: func(val string) string {
  65. return val
  66. },
  67. decodeValue: func(val string) string {
  68. return val
  69. },
  70. }
  71. }
  72. func newHTTPHeaderPropagator(headerKeys *HeadersConfig, metrics Metrics) *textMapPropagator {
  73. return &textMapPropagator{
  74. headerKeys: headerKeys,
  75. metrics: metrics,
  76. encodeValue: func(val string) string {
  77. return url.QueryEscape(val)
  78. },
  79. decodeValue: func(val string) string {
  80. // ignore decoding errors, cannot do anything about them
  81. if v, err := url.QueryUnescape(val); err == nil {
  82. return v
  83. }
  84. return val
  85. },
  86. }
  87. }
  88. type binaryPropagator struct {
  89. tracer *Tracer
  90. buffers sync.Pool
  91. }
  92. func newBinaryPropagator(tracer *Tracer) *binaryPropagator {
  93. return &binaryPropagator{
  94. tracer: tracer,
  95. buffers: sync.Pool{New: func() interface{} { return &bytes.Buffer{} }},
  96. }
  97. }
  98. func (p *textMapPropagator) Inject(
  99. sc SpanContext,
  100. abstractCarrier interface{},
  101. ) error {
  102. textMapWriter, ok := abstractCarrier.(opentracing.TextMapWriter)
  103. if !ok {
  104. return opentracing.ErrInvalidCarrier
  105. }
  106. // Do not encode the string with trace context to avoid accidental double-encoding
  107. // if people are using opentracing < 0.10.0. Our colon-separated representation
  108. // of the trace context is already safe for HTTP headers.
  109. textMapWriter.Set(p.headerKeys.TraceContextHeaderName, sc.String())
  110. for k, v := range sc.baggage {
  111. safeKey := p.addBaggageKeyPrefix(k)
  112. safeVal := p.encodeValue(v)
  113. textMapWriter.Set(safeKey, safeVal)
  114. }
  115. return nil
  116. }
  117. func (p *textMapPropagator) Extract(abstractCarrier interface{}) (SpanContext, error) {
  118. textMapReader, ok := abstractCarrier.(opentracing.TextMapReader)
  119. if !ok {
  120. return emptyContext, opentracing.ErrInvalidCarrier
  121. }
  122. var ctx SpanContext
  123. var baggage map[string]string
  124. err := textMapReader.ForeachKey(func(rawKey, value string) error {
  125. key := strings.ToLower(rawKey) // TODO not necessary for plain TextMap
  126. if key == p.headerKeys.TraceContextHeaderName {
  127. var err error
  128. safeVal := p.decodeValue(value)
  129. if ctx, err = ContextFromString(safeVal); err != nil {
  130. return err
  131. }
  132. } else if key == p.headerKeys.JaegerDebugHeader {
  133. ctx.debugID = p.decodeValue(value)
  134. } else if key == p.headerKeys.JaegerBaggageHeader {
  135. if baggage == nil {
  136. baggage = make(map[string]string)
  137. }
  138. for k, v := range p.parseCommaSeparatedMap(value) {
  139. baggage[k] = v
  140. }
  141. } else if strings.HasPrefix(key, p.headerKeys.TraceBaggageHeaderPrefix) {
  142. if baggage == nil {
  143. baggage = make(map[string]string)
  144. }
  145. safeKey := p.removeBaggageKeyPrefix(key)
  146. safeVal := p.decodeValue(value)
  147. baggage[safeKey] = safeVal
  148. }
  149. return nil
  150. })
  151. if err != nil {
  152. p.metrics.DecodingErrors.Inc(1)
  153. return emptyContext, err
  154. }
  155. if !ctx.traceID.IsValid() && ctx.debugID == "" && len(baggage) == 0 {
  156. return emptyContext, opentracing.ErrSpanContextNotFound
  157. }
  158. ctx.baggage = baggage
  159. return ctx, nil
  160. }
  161. func (p *binaryPropagator) Inject(
  162. sc SpanContext,
  163. abstractCarrier interface{},
  164. ) error {
  165. carrier, ok := abstractCarrier.(io.Writer)
  166. if !ok {
  167. return opentracing.ErrInvalidCarrier
  168. }
  169. // Handle the tracer context
  170. if err := binary.Write(carrier, binary.BigEndian, sc.traceID); err != nil {
  171. return err
  172. }
  173. if err := binary.Write(carrier, binary.BigEndian, sc.spanID); err != nil {
  174. return err
  175. }
  176. if err := binary.Write(carrier, binary.BigEndian, sc.parentID); err != nil {
  177. return err
  178. }
  179. if err := binary.Write(carrier, binary.BigEndian, sc.flags); err != nil {
  180. return err
  181. }
  182. // Handle the baggage items
  183. if err := binary.Write(carrier, binary.BigEndian, int32(len(sc.baggage))); err != nil {
  184. return err
  185. }
  186. for k, v := range sc.baggage {
  187. if err := binary.Write(carrier, binary.BigEndian, int32(len(k))); err != nil {
  188. return err
  189. }
  190. io.WriteString(carrier, k)
  191. if err := binary.Write(carrier, binary.BigEndian, int32(len(v))); err != nil {
  192. return err
  193. }
  194. io.WriteString(carrier, v)
  195. }
  196. return nil
  197. }
  198. func (p *binaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, error) {
  199. carrier, ok := abstractCarrier.(io.Reader)
  200. if !ok {
  201. return emptyContext, opentracing.ErrInvalidCarrier
  202. }
  203. var ctx SpanContext
  204. if err := binary.Read(carrier, binary.BigEndian, &ctx.traceID); err != nil {
  205. return emptyContext, opentracing.ErrSpanContextCorrupted
  206. }
  207. if err := binary.Read(carrier, binary.BigEndian, &ctx.spanID); err != nil {
  208. return emptyContext, opentracing.ErrSpanContextCorrupted
  209. }
  210. if err := binary.Read(carrier, binary.BigEndian, &ctx.parentID); err != nil {
  211. return emptyContext, opentracing.ErrSpanContextCorrupted
  212. }
  213. if err := binary.Read(carrier, binary.BigEndian, &ctx.flags); err != nil {
  214. return emptyContext, opentracing.ErrSpanContextCorrupted
  215. }
  216. // Handle the baggage items
  217. var numBaggage int32
  218. if err := binary.Read(carrier, binary.BigEndian, &numBaggage); err != nil {
  219. return emptyContext, opentracing.ErrSpanContextCorrupted
  220. }
  221. if iNumBaggage := int(numBaggage); iNumBaggage > 0 {
  222. ctx.baggage = make(map[string]string, iNumBaggage)
  223. buf := p.buffers.Get().(*bytes.Buffer)
  224. defer p.buffers.Put(buf)
  225. var keyLen, valLen int32
  226. for i := 0; i < iNumBaggage; i++ {
  227. if err := binary.Read(carrier, binary.BigEndian, &keyLen); err != nil {
  228. return emptyContext, opentracing.ErrSpanContextCorrupted
  229. }
  230. buf.Reset()
  231. buf.Grow(int(keyLen))
  232. if n, err := io.CopyN(buf, carrier, int64(keyLen)); err != nil || int32(n) != keyLen {
  233. return emptyContext, opentracing.ErrSpanContextCorrupted
  234. }
  235. key := buf.String()
  236. if err := binary.Read(carrier, binary.BigEndian, &valLen); err != nil {
  237. return emptyContext, opentracing.ErrSpanContextCorrupted
  238. }
  239. buf.Reset()
  240. buf.Grow(int(valLen))
  241. if n, err := io.CopyN(buf, carrier, int64(valLen)); err != nil || int32(n) != valLen {
  242. return emptyContext, opentracing.ErrSpanContextCorrupted
  243. }
  244. ctx.baggage[key] = buf.String()
  245. }
  246. }
  247. return ctx, nil
  248. }
  249. // Converts a comma separated key value pair list into a map
  250. // e.g. key1=value1, key2=value2, key3 = value3
  251. // is converted to map[string]string { "key1" : "value1",
  252. // "key2" : "value2",
  253. // "key3" : "value3" }
  254. func (p *textMapPropagator) parseCommaSeparatedMap(value string) map[string]string {
  255. baggage := make(map[string]string)
  256. value, err := url.QueryUnescape(value)
  257. if err != nil {
  258. log.Printf("Unable to unescape %s, %v", value, err)
  259. return baggage
  260. }
  261. for _, kvpair := range strings.Split(value, ",") {
  262. kv := strings.Split(strings.TrimSpace(kvpair), "=")
  263. if len(kv) == 2 {
  264. baggage[kv[0]] = kv[1]
  265. } else {
  266. log.Printf("Malformed value passed in for %s", p.headerKeys.JaegerBaggageHeader)
  267. }
  268. }
  269. return baggage
  270. }
  271. // Converts a baggage item key into an http header format,
  272. // by prepending TraceBaggageHeaderPrefix and encoding the key string
  273. func (p *textMapPropagator) addBaggageKeyPrefix(key string) string {
  274. // TODO encodeBaggageKeyAsHeader add caching and escaping
  275. return fmt.Sprintf("%v%v", p.headerKeys.TraceBaggageHeaderPrefix, key)
  276. }
  277. func (p *textMapPropagator) removeBaggageKeyPrefix(key string) string {
  278. // TODO decodeBaggageHeaderKey add caching and escaping
  279. return key[len(p.headerKeys.TraceBaggageHeaderPrefix):]
  280. }