tracer.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "fmt"
  17. "io"
  18. "os"
  19. "reflect"
  20. "sync"
  21. "time"
  22. "github.com/opentracing/opentracing-go"
  23. "github.com/opentracing/opentracing-go/ext"
  24. "github.com/uber/jaeger-client-go/internal/baggage"
  25. "github.com/uber/jaeger-client-go/log"
  26. "github.com/uber/jaeger-client-go/utils"
  27. )
  28. // Tracer implements opentracing.Tracer.
  29. type Tracer struct {
  30. serviceName string
  31. hostIPv4 uint32 // this is for zipkin endpoint conversion
  32. sampler Sampler
  33. reporter Reporter
  34. metrics Metrics
  35. logger log.Logger
  36. timeNow func() time.Time
  37. randomNumber func() uint64
  38. options struct {
  39. poolSpans bool
  40. gen128Bit bool // whether to generate 128bit trace IDs
  41. zipkinSharedRPCSpan bool
  42. highTraceIDGenerator func() uint64 // custom high trace ID generator
  43. // more options to come
  44. }
  45. // pool for Span objects
  46. spanPool sync.Pool
  47. injectors map[interface{}]Injector
  48. extractors map[interface{}]Extractor
  49. observer compositeObserver
  50. tags []Tag
  51. baggageRestrictionManager baggage.RestrictionManager
  52. baggageSetter *baggageSetter
  53. }
  54. // NewTracer creates Tracer implementation that reports tracing to Jaeger.
  55. // The returned io.Closer can be used in shutdown hooks to ensure that the internal
  56. // queue of the Reporter is drained and all buffered spans are submitted to collectors.
  57. func NewTracer(
  58. serviceName string,
  59. sampler Sampler,
  60. reporter Reporter,
  61. options ...TracerOption,
  62. ) (opentracing.Tracer, io.Closer) {
  63. t := &Tracer{
  64. serviceName: serviceName,
  65. sampler: sampler,
  66. reporter: reporter,
  67. injectors: make(map[interface{}]Injector),
  68. extractors: make(map[interface{}]Extractor),
  69. metrics: *NewNullMetrics(),
  70. spanPool: sync.Pool{New: func() interface{} {
  71. return &Span{}
  72. }},
  73. }
  74. for _, option := range options {
  75. option(t)
  76. }
  77. // register default injectors/extractors unless they are already provided via options
  78. textPropagator := newTextMapPropagator(getDefaultHeadersConfig(), t.metrics)
  79. t.addCodec(opentracing.TextMap, textPropagator, textPropagator)
  80. httpHeaderPropagator := newHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics)
  81. t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator)
  82. binaryPropagator := newBinaryPropagator(t)
  83. t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator)
  84. // TODO remove after TChannel supports OpenTracing
  85. interopPropagator := &jaegerTraceContextPropagator{tracer: t}
  86. t.addCodec(SpanContextFormat, interopPropagator, interopPropagator)
  87. zipkinPropagator := &zipkinPropagator{tracer: t}
  88. t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator)
  89. if t.baggageRestrictionManager != nil {
  90. t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics)
  91. } else {
  92. t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics)
  93. }
  94. if t.randomNumber == nil {
  95. rng := utils.NewRand(time.Now().UnixNano())
  96. t.randomNumber = func() uint64 {
  97. return uint64(rng.Int63())
  98. }
  99. }
  100. if t.timeNow == nil {
  101. t.timeNow = time.Now
  102. }
  103. if t.logger == nil {
  104. t.logger = log.NullLogger
  105. }
  106. // Set tracer-level tags
  107. t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion})
  108. if hostname, err := os.Hostname(); err == nil {
  109. t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname})
  110. }
  111. if ip, err := utils.HostIP(); err == nil {
  112. t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()})
  113. t.hostIPv4 = utils.PackIPAsUint32(ip)
  114. } else {
  115. t.logger.Error("Unable to determine this host's IP address: " + err.Error())
  116. }
  117. if t.options.gen128Bit {
  118. if t.options.highTraceIDGenerator == nil {
  119. t.options.highTraceIDGenerator = t.randomNumber
  120. }
  121. } else if t.options.highTraceIDGenerator != nil {
  122. t.logger.Error("Overriding high trace ID generator but not generating " +
  123. "128 bit trace IDs, consider enabling the \"Gen128Bit\" option")
  124. }
  125. return t, t
  126. }
  127. // addCodec adds registers injector and extractor for given propagation format if not already defined.
  128. func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) {
  129. if _, ok := t.injectors[format]; !ok {
  130. t.injectors[format] = injector
  131. }
  132. if _, ok := t.extractors[format]; !ok {
  133. t.extractors[format] = extractor
  134. }
  135. }
  136. // StartSpan implements StartSpan() method of opentracing.Tracer.
  137. func (t *Tracer) StartSpan(
  138. operationName string,
  139. options ...opentracing.StartSpanOption,
  140. ) opentracing.Span {
  141. sso := opentracing.StartSpanOptions{}
  142. for _, o := range options {
  143. o.Apply(&sso)
  144. }
  145. return t.startSpanWithOptions(operationName, sso)
  146. }
  147. func (t *Tracer) startSpanWithOptions(
  148. operationName string,
  149. options opentracing.StartSpanOptions,
  150. ) opentracing.Span {
  151. if options.StartTime.IsZero() {
  152. options.StartTime = t.timeNow()
  153. }
  154. var references []Reference
  155. var parent SpanContext
  156. var hasParent bool // need this because `parent` is a value, not reference
  157. for _, ref := range options.References {
  158. ctx, ok := ref.ReferencedContext.(SpanContext)
  159. if !ok {
  160. t.logger.Error(fmt.Sprintf(
  161. "Reference contains invalid type of SpanReference: %s",
  162. reflect.ValueOf(ref.ReferencedContext)))
  163. continue
  164. }
  165. if !(ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0) {
  166. continue
  167. }
  168. references = append(references, Reference{Type: ref.Type, Context: ctx})
  169. if !hasParent {
  170. parent = ctx
  171. hasParent = ref.Type == opentracing.ChildOfRef
  172. }
  173. }
  174. if !hasParent && parent.IsValid() {
  175. // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
  176. // the FollowFromRef as the parent
  177. hasParent = true
  178. }
  179. rpcServer := false
  180. if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok {
  181. rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
  182. }
  183. var samplerTags []Tag
  184. var ctx SpanContext
  185. newTrace := false
  186. if !hasParent || !parent.IsValid() {
  187. newTrace = true
  188. ctx.traceID.Low = t.randomID()
  189. if t.options.gen128Bit {
  190. ctx.traceID.High = t.options.highTraceIDGenerator()
  191. }
  192. ctx.spanID = SpanID(ctx.traceID.Low)
  193. ctx.parentID = 0
  194. ctx.flags = byte(0)
  195. if hasParent && parent.isDebugIDContainerOnly() {
  196. ctx.flags |= (flagSampled | flagDebug)
  197. samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
  198. } else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
  199. ctx.flags |= flagSampled
  200. samplerTags = tags
  201. }
  202. } else {
  203. ctx.traceID = parent.traceID
  204. if rpcServer && t.options.zipkinSharedRPCSpan {
  205. // Support Zipkin's one-span-per-RPC model
  206. ctx.spanID = parent.spanID
  207. ctx.parentID = parent.parentID
  208. } else {
  209. ctx.spanID = SpanID(t.randomID())
  210. ctx.parentID = parent.spanID
  211. }
  212. ctx.flags = parent.flags
  213. }
  214. if hasParent {
  215. // copy baggage items
  216. if l := len(parent.baggage); l > 0 {
  217. ctx.baggage = make(map[string]string, len(parent.baggage))
  218. for k, v := range parent.baggage {
  219. ctx.baggage[k] = v
  220. }
  221. }
  222. }
  223. sp := t.newSpan()
  224. sp.context = ctx
  225. sp.observer = t.observer.OnStartSpan(sp, operationName, options)
  226. return t.startSpanInternal(
  227. sp,
  228. operationName,
  229. options.StartTime,
  230. samplerTags,
  231. options.Tags,
  232. newTrace,
  233. rpcServer,
  234. references,
  235. )
  236. }
  237. // Inject implements Inject() method of opentracing.Tracer
  238. func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error {
  239. c, ok := ctx.(SpanContext)
  240. if !ok {
  241. return opentracing.ErrInvalidSpanContext
  242. }
  243. if injector, ok := t.injectors[format]; ok {
  244. return injector.Inject(c, carrier)
  245. }
  246. return opentracing.ErrUnsupportedFormat
  247. }
  248. // Extract implements Extract() method of opentracing.Tracer
  249. func (t *Tracer) Extract(
  250. format interface{},
  251. carrier interface{},
  252. ) (opentracing.SpanContext, error) {
  253. if extractor, ok := t.extractors[format]; ok {
  254. return extractor.Extract(carrier)
  255. }
  256. return nil, opentracing.ErrUnsupportedFormat
  257. }
  258. // Close releases all resources used by the Tracer and flushes any remaining buffered spans.
  259. func (t *Tracer) Close() error {
  260. t.reporter.Close()
  261. t.sampler.Close()
  262. if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
  263. mgr.Close()
  264. }
  265. return nil
  266. }
  267. // Tags returns a slice of tracer-level tags.
  268. func (t *Tracer) Tags() []opentracing.Tag {
  269. tags := make([]opentracing.Tag, len(t.tags))
  270. for i, tag := range t.tags {
  271. tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value}
  272. }
  273. return tags
  274. }
  275. // newSpan returns an instance of a clean Span object.
  276. // If options.PoolSpans is true, the spans are retrieved from an object pool.
  277. func (t *Tracer) newSpan() *Span {
  278. if !t.options.poolSpans {
  279. return &Span{}
  280. }
  281. sp := t.spanPool.Get().(*Span)
  282. sp.context = emptyContext
  283. sp.tracer = nil
  284. sp.tags = nil
  285. sp.logs = nil
  286. return sp
  287. }
  288. func (t *Tracer) startSpanInternal(
  289. sp *Span,
  290. operationName string,
  291. startTime time.Time,
  292. internalTags []Tag,
  293. tags opentracing.Tags,
  294. newTrace bool,
  295. rpcServer bool,
  296. references []Reference,
  297. ) *Span {
  298. sp.tracer = t
  299. sp.operationName = operationName
  300. sp.startTime = startTime
  301. sp.duration = 0
  302. sp.references = references
  303. sp.firstInProcess = rpcServer || sp.context.parentID == 0
  304. if len(tags) > 0 || len(internalTags) > 0 {
  305. sp.tags = make([]Tag, len(internalTags), len(tags)+len(internalTags))
  306. copy(sp.tags, internalTags)
  307. for k, v := range tags {
  308. sp.observer.OnSetTag(k, v)
  309. if k == string(ext.SamplingPriority) && setSamplingPriority(sp, v) {
  310. continue
  311. }
  312. sp.setTagNoLocking(k, v)
  313. }
  314. }
  315. // emit metrics
  316. if sp.context.IsSampled() {
  317. t.metrics.SpansStartedSampled.Inc(1)
  318. if newTrace {
  319. // We cannot simply check for parentID==0 because in Zipkin model the
  320. // server-side RPC span has the exact same trace/span/parent IDs as the
  321. // calling client-side span, but obviously the server side span is
  322. // no longer a root span of the trace.
  323. t.metrics.TracesStartedSampled.Inc(1)
  324. } else if sp.firstInProcess {
  325. t.metrics.TracesJoinedSampled.Inc(1)
  326. }
  327. } else {
  328. t.metrics.SpansStartedNotSampled.Inc(1)
  329. if newTrace {
  330. t.metrics.TracesStartedNotSampled.Inc(1)
  331. } else if sp.firstInProcess {
  332. t.metrics.TracesJoinedNotSampled.Inc(1)
  333. }
  334. }
  335. return sp
  336. }
  337. func (t *Tracer) reportSpan(sp *Span) {
  338. t.metrics.SpansFinished.Inc(1)
  339. if sp.context.IsSampled() {
  340. t.reporter.Report(sp)
  341. }
  342. if t.options.poolSpans {
  343. t.spanPool.Put(sp)
  344. }
  345. }
  346. // randomID generates a random trace/span ID, using tracer.random() generator.
  347. // It never returns 0.
  348. func (t *Tracer) randomID() uint64 {
  349. val := t.randomNumber()
  350. for val == 0 {
  351. val = t.randomNumber()
  352. }
  353. return val
  354. }
  355. // (NB) span should hold the lock before making this call
  356. func (t *Tracer) setBaggage(sp *Span, key, value string) {
  357. t.baggageSetter.setBaggage(sp, key, value)
  358. }