tracer.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. // Copyright (c) 2016 Uber Technologies, Inc.
  2. // Permission is hereby granted, free of charge, to any person obtaining a copy
  3. // of this software and associated documentation files (the "Software"), to deal
  4. // in the Software without restriction, including without limitation the rights
  5. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  6. // copies of the Software, and to permit persons to whom the Software is
  7. // furnished to do so, subject to the following conditions:
  8. //
  9. // The above copyright notice and this permission notice shall be included in
  10. // all copies or substantial portions of the Software.
  11. //
  12. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  13. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  15. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. // THE SOFTWARE.
  19. package jaeger
  20. import (
  21. "fmt"
  22. "io"
  23. "os"
  24. "reflect"
  25. "sync"
  26. "time"
  27. "github.com/opentracing/opentracing-go"
  28. "github.com/opentracing/opentracing-go/ext"
  29. "github.com/uber/jaeger-client-go/internal/baggage"
  30. "github.com/uber/jaeger-client-go/log"
  31. "github.com/uber/jaeger-client-go/utils"
  32. )
  33. // Tracer implements opentracing.Tracer.
  34. type Tracer struct {
  35. serviceName string
  36. hostIPv4 uint32 // this is for zipkin endpoint conversion
  37. sampler Sampler
  38. reporter Reporter
  39. metrics Metrics
  40. logger log.Logger
  41. timeNow func() time.Time
  42. randomNumber func() uint64
  43. options struct {
  44. poolSpans bool
  45. gen128Bit bool // whether to generate 128bit trace IDs
  46. zipkinSharedRPCSpan bool
  47. // more options to come
  48. }
  49. // pool for Span objects
  50. spanPool sync.Pool
  51. injectors map[interface{}]Injector
  52. extractors map[interface{}]Extractor
  53. observer compositeObserver
  54. tags []Tag
  55. baggageRestrictionManager baggage.RestrictionManager
  56. baggageSetter *baggageSetter
  57. }
  58. // NewTracer creates Tracer implementation that reports tracing to Jaeger.
  59. // The returned io.Closer can be used in shutdown hooks to ensure that the internal
  60. // queue of the Reporter is drained and all buffered spans are submitted to collectors.
  61. func NewTracer(
  62. serviceName string,
  63. sampler Sampler,
  64. reporter Reporter,
  65. options ...TracerOption,
  66. ) (opentracing.Tracer, io.Closer) {
  67. t := &Tracer{
  68. serviceName: serviceName,
  69. sampler: sampler,
  70. reporter: reporter,
  71. injectors: make(map[interface{}]Injector),
  72. extractors: make(map[interface{}]Extractor),
  73. metrics: *NewNullMetrics(),
  74. spanPool: sync.Pool{New: func() interface{} {
  75. return &Span{}
  76. }},
  77. }
  78. for _, option := range options {
  79. option(t)
  80. }
  81. // register default injectors/extractors unless they are already provided via options
  82. textPropagator := newTextMapPropagator(getDefaultHeadersConfig(), t.metrics)
  83. t.addCodec(opentracing.TextMap, textPropagator, textPropagator)
  84. httpHeaderPropagator := newHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics)
  85. t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator)
  86. binaryPropagator := newBinaryPropagator(t)
  87. t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator)
  88. // TODO remove after TChannel supports OpenTracing
  89. interopPropagator := &jaegerTraceContextPropagator{tracer: t}
  90. t.addCodec(SpanContextFormat, interopPropagator, interopPropagator)
  91. zipkinPropagator := &zipkinPropagator{tracer: t}
  92. t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator)
  93. if t.baggageRestrictionManager != nil {
  94. t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics)
  95. } else {
  96. t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics)
  97. }
  98. if t.randomNumber == nil {
  99. rng := utils.NewRand(time.Now().UnixNano())
  100. t.randomNumber = func() uint64 {
  101. return uint64(rng.Int63())
  102. }
  103. }
  104. if t.timeNow == nil {
  105. t.timeNow = time.Now
  106. }
  107. if t.logger == nil {
  108. t.logger = log.NullLogger
  109. }
  110. // Set tracer-level tags
  111. t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion})
  112. if hostname, err := os.Hostname(); err == nil {
  113. t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname})
  114. }
  115. if ip, err := utils.HostIP(); err == nil {
  116. t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()})
  117. t.hostIPv4 = utils.PackIPAsUint32(ip)
  118. } else {
  119. t.logger.Error("Unable to determine this host's IP address: " + err.Error())
  120. }
  121. return t, t
  122. }
  123. // addCodec adds registers injector and extractor for given propagation format if not already defined.
  124. func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) {
  125. if _, ok := t.injectors[format]; !ok {
  126. t.injectors[format] = injector
  127. }
  128. if _, ok := t.extractors[format]; !ok {
  129. t.extractors[format] = extractor
  130. }
  131. }
  132. // StartSpan implements StartSpan() method of opentracing.Tracer.
  133. func (t *Tracer) StartSpan(
  134. operationName string,
  135. options ...opentracing.StartSpanOption,
  136. ) opentracing.Span {
  137. sso := opentracing.StartSpanOptions{}
  138. for _, o := range options {
  139. o.Apply(&sso)
  140. }
  141. return t.startSpanWithOptions(operationName, sso)
  142. }
  143. func (t *Tracer) startSpanWithOptions(
  144. operationName string,
  145. options opentracing.StartSpanOptions,
  146. ) opentracing.Span {
  147. if options.StartTime.IsZero() {
  148. options.StartTime = t.timeNow()
  149. }
  150. var references []Reference
  151. var parent SpanContext
  152. var hasParent bool // need this because `parent` is a value, not reference
  153. for _, ref := range options.References {
  154. ctx, ok := ref.ReferencedContext.(SpanContext)
  155. if !ok {
  156. t.logger.Error(fmt.Sprintf(
  157. "Reference contains invalid type of SpanReference: %s",
  158. reflect.ValueOf(ref.ReferencedContext)))
  159. continue
  160. }
  161. if !(ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0) {
  162. continue
  163. }
  164. references = append(references, Reference{Type: ref.Type, Context: ctx})
  165. if !hasParent {
  166. parent = ctx
  167. hasParent = ref.Type == opentracing.ChildOfRef
  168. }
  169. }
  170. if !hasParent && parent.IsValid() {
  171. // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
  172. // the FollowFromRef as the parent
  173. hasParent = true
  174. }
  175. rpcServer := false
  176. if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok {
  177. rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
  178. }
  179. var samplerTags []Tag
  180. var ctx SpanContext
  181. newTrace := false
  182. if !hasParent || !parent.IsValid() {
  183. newTrace = true
  184. ctx.traceID.Low = t.randomID()
  185. if t.options.gen128Bit {
  186. ctx.traceID.High = t.randomID()
  187. }
  188. ctx.spanID = SpanID(ctx.traceID.Low)
  189. ctx.parentID = 0
  190. ctx.flags = byte(0)
  191. if hasParent && parent.isDebugIDContainerOnly() {
  192. ctx.flags |= (flagSampled | flagDebug)
  193. samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
  194. } else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
  195. ctx.flags |= flagSampled
  196. samplerTags = tags
  197. }
  198. } else {
  199. ctx.traceID = parent.traceID
  200. if rpcServer && t.options.zipkinSharedRPCSpan {
  201. // Support Zipkin's one-span-per-RPC model
  202. ctx.spanID = parent.spanID
  203. ctx.parentID = parent.parentID
  204. } else {
  205. ctx.spanID = SpanID(t.randomID())
  206. ctx.parentID = parent.spanID
  207. }
  208. ctx.flags = parent.flags
  209. }
  210. if hasParent {
  211. // copy baggage items
  212. if l := len(parent.baggage); l > 0 {
  213. ctx.baggage = make(map[string]string, len(parent.baggage))
  214. for k, v := range parent.baggage {
  215. ctx.baggage[k] = v
  216. }
  217. }
  218. }
  219. sp := t.newSpan()
  220. sp.context = ctx
  221. sp.observer = t.observer.OnStartSpan(sp, operationName, options)
  222. return t.startSpanInternal(
  223. sp,
  224. operationName,
  225. options.StartTime,
  226. samplerTags,
  227. options.Tags,
  228. newTrace,
  229. rpcServer,
  230. references,
  231. )
  232. }
  233. // Inject implements Inject() method of opentracing.Tracer
  234. func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error {
  235. c, ok := ctx.(SpanContext)
  236. if !ok {
  237. return opentracing.ErrInvalidSpanContext
  238. }
  239. if injector, ok := t.injectors[format]; ok {
  240. return injector.Inject(c, carrier)
  241. }
  242. return opentracing.ErrUnsupportedFormat
  243. }
  244. // Extract implements Extract() method of opentracing.Tracer
  245. func (t *Tracer) Extract(
  246. format interface{},
  247. carrier interface{},
  248. ) (opentracing.SpanContext, error) {
  249. if extractor, ok := t.extractors[format]; ok {
  250. return extractor.Extract(carrier)
  251. }
  252. return nil, opentracing.ErrUnsupportedFormat
  253. }
  254. // Close releases all resources used by the Tracer and flushes any remaining buffered spans.
  255. func (t *Tracer) Close() error {
  256. t.reporter.Close()
  257. t.sampler.Close()
  258. if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
  259. mgr.Close()
  260. }
  261. return nil
  262. }
  263. // Tags returns a slice of tracer-level tags.
  264. func (t *Tracer) Tags() []opentracing.Tag {
  265. tags := make([]opentracing.Tag, len(t.tags))
  266. for i, tag := range t.tags {
  267. tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value}
  268. }
  269. return tags
  270. }
  271. // newSpan returns an instance of a clean Span object.
  272. // If options.PoolSpans is true, the spans are retrieved from an object pool.
  273. func (t *Tracer) newSpan() *Span {
  274. if !t.options.poolSpans {
  275. return &Span{}
  276. }
  277. sp := t.spanPool.Get().(*Span)
  278. sp.context = emptyContext
  279. sp.tracer = nil
  280. sp.tags = nil
  281. sp.logs = nil
  282. return sp
  283. }
  284. func (t *Tracer) startSpanInternal(
  285. sp *Span,
  286. operationName string,
  287. startTime time.Time,
  288. internalTags []Tag,
  289. tags opentracing.Tags,
  290. newTrace bool,
  291. rpcServer bool,
  292. references []Reference,
  293. ) *Span {
  294. sp.tracer = t
  295. sp.operationName = operationName
  296. sp.startTime = startTime
  297. sp.duration = 0
  298. sp.references = references
  299. sp.firstInProcess = rpcServer || sp.context.parentID == 0
  300. if len(tags) > 0 || len(internalTags) > 0 {
  301. sp.tags = make([]Tag, len(internalTags), len(tags)+len(internalTags))
  302. copy(sp.tags, internalTags)
  303. for k, v := range tags {
  304. sp.observer.OnSetTag(k, v)
  305. if k == string(ext.SamplingPriority) && setSamplingPriority(sp, v) {
  306. continue
  307. }
  308. sp.setTagNoLocking(k, v)
  309. }
  310. }
  311. // emit metrics
  312. t.metrics.SpansStarted.Inc(1)
  313. if sp.context.IsSampled() {
  314. t.metrics.SpansSampled.Inc(1)
  315. if newTrace {
  316. // We cannot simply check for parentID==0 because in Zipkin model the
  317. // server-side RPC span has the exact same trace/span/parent IDs as the
  318. // calling client-side span, but obviously the server side span is
  319. // no longer a root span of the trace.
  320. t.metrics.TracesStartedSampled.Inc(1)
  321. } else if sp.firstInProcess {
  322. t.metrics.TracesJoinedSampled.Inc(1)
  323. }
  324. } else {
  325. t.metrics.SpansNotSampled.Inc(1)
  326. if newTrace {
  327. t.metrics.TracesStartedNotSampled.Inc(1)
  328. } else if sp.firstInProcess {
  329. t.metrics.TracesJoinedNotSampled.Inc(1)
  330. }
  331. }
  332. return sp
  333. }
  334. func (t *Tracer) reportSpan(sp *Span) {
  335. t.metrics.SpansFinished.Inc(1)
  336. if sp.context.IsSampled() {
  337. t.reporter.Report(sp)
  338. }
  339. if t.options.poolSpans {
  340. t.spanPool.Put(sp)
  341. }
  342. }
  343. // randomID generates a random trace/span ID, using tracer.random() generator.
  344. // It never returns 0.
  345. func (t *Tracer) randomID() uint64 {
  346. val := t.randomNumber()
  347. for val == 0 {
  348. val = t.randomNumber()
  349. }
  350. return val
  351. }
  352. // (NB) span should hold the lock before making this call
  353. func (t *Tracer) setBaggage(sp *Span, key, value string) {
  354. t.baggageSetter.setBaggage(sp, key, value)
  355. }