reporter.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. // Copyright (c) 2016 Uber Technologies, Inc.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a copy
  4. // of this software and associated documentation files (the "Software"), to deal
  5. // in the Software without restriction, including without limitation the rights
  6. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. // copies of the Software, and to permit persons to whom the Software is
  8. // furnished to do so, subject to the following conditions:
  9. //
  10. // The above copyright notice and this permission notice shall be included in
  11. // all copies or substantial portions of the Software.
  12. //
  13. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19. // THE SOFTWARE.
  20. package jaeger
  21. import (
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/opentracing/opentracing-go"
  26. "github.com/uber/jaeger-client-go/log"
  27. )
  28. // Reporter is called by the tracer when a span is completed to report the span to the tracing collector.
  29. type Reporter interface {
  30. // Report submits a new span to collectors, possibly asynchronously and/or with buffering.
  31. Report(span *Span)
  32. // Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory.
  33. Close()
  34. }
  35. // ------------------------------
  36. type nullReporter struct{}
  37. // NewNullReporter creates a no-op reporter that ignores all reported spans.
  38. func NewNullReporter() Reporter {
  39. return &nullReporter{}
  40. }
  41. // Report implements Report() method of Reporter by doing nothing.
  42. func (r *nullReporter) Report(span *Span) {
  43. // no-op
  44. }
  45. // Close implements Close() method of Reporter by doing nothing.
  46. func (r *nullReporter) Close() {
  47. // no-op
  48. }
  49. // ------------------------------
  50. type loggingReporter struct {
  51. logger Logger
  52. }
  53. // NewLoggingReporter creates a reporter that logs all reported spans to provided logger.
  54. func NewLoggingReporter(logger Logger) Reporter {
  55. return &loggingReporter{logger}
  56. }
  57. // Report implements Report() method of Reporter by logging the span to the logger.
  58. func (r *loggingReporter) Report(span *Span) {
  59. r.logger.Infof("Reporting span %+v", span)
  60. }
  61. // Close implements Close() method of Reporter by doing nothing.
  62. func (r *loggingReporter) Close() {
  63. // no-op
  64. }
  65. // ------------------------------
  66. // InMemoryReporter is used for testing, and simply collects spans in memory.
  67. type InMemoryReporter struct {
  68. spans []opentracing.Span
  69. lock sync.Mutex
  70. }
  71. // NewInMemoryReporter creates a reporter that stores spans in memory.
  72. // NOTE: the Tracer should be created with options.PoolSpans = false.
  73. func NewInMemoryReporter() *InMemoryReporter {
  74. return &InMemoryReporter{
  75. spans: make([]opentracing.Span, 0, 10),
  76. }
  77. }
  78. // Report implements Report() method of Reporter by storing the span in the buffer.
  79. func (r *InMemoryReporter) Report(span *Span) {
  80. r.lock.Lock()
  81. r.spans = append(r.spans, span)
  82. r.lock.Unlock()
  83. }
  84. // Close implements Close() method of Reporter by doing nothing.
  85. func (r *InMemoryReporter) Close() {
  86. // no-op
  87. }
  88. // SpansSubmitted returns the number of spans accumulated in the buffer.
  89. func (r *InMemoryReporter) SpansSubmitted() int {
  90. r.lock.Lock()
  91. defer r.lock.Unlock()
  92. return len(r.spans)
  93. }
  94. // GetSpans returns accumulated spans as a copy of the buffer.
  95. func (r *InMemoryReporter) GetSpans() []opentracing.Span {
  96. r.lock.Lock()
  97. defer r.lock.Unlock()
  98. copied := make([]opentracing.Span, len(r.spans))
  99. copy(copied, r.spans)
  100. return copied
  101. }
  102. // Reset clears all accumulated spans.
  103. func (r *InMemoryReporter) Reset() {
  104. r.lock.Lock()
  105. defer r.lock.Unlock()
  106. r.spans = nil
  107. }
  108. // ------------------------------
  109. type compositeReporter struct {
  110. reporters []Reporter
  111. }
  112. // NewCompositeReporter creates a reporter that ignores all reported spans.
  113. func NewCompositeReporter(reporters ...Reporter) Reporter {
  114. return &compositeReporter{reporters: reporters}
  115. }
  116. // Report implements Report() method of Reporter by delegating to each underlying reporter.
  117. func (r *compositeReporter) Report(span *Span) {
  118. for _, reporter := range r.reporters {
  119. reporter.Report(span)
  120. }
  121. }
  122. // Close implements Close() method of Reporter by closing each underlying reporter.
  123. func (r *compositeReporter) Close() {
  124. for _, reporter := range r.reporters {
  125. reporter.Close()
  126. }
  127. }
  128. // ------------------------------
  129. const (
  130. defaultQueueSize = 100
  131. defaultBufferFlushInterval = 10 * time.Second
  132. )
  133. type remoteReporter struct {
  134. // must be first in the struct because `sync/atomic` expects 64-bit alignment.
  135. // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
  136. queueLength int64
  137. reporterOptions
  138. sender Transport
  139. queue chan *Span
  140. queueDrained sync.WaitGroup
  141. flushSignal chan *sync.WaitGroup
  142. }
  143. // NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender
  144. func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
  145. options := reporterOptions{}
  146. for _, option := range opts {
  147. option(&options)
  148. }
  149. if options.bufferFlushInterval <= 0 {
  150. options.bufferFlushInterval = defaultBufferFlushInterval
  151. }
  152. if options.logger == nil {
  153. options.logger = log.NullLogger
  154. }
  155. if options.metrics == nil {
  156. options.metrics = NewNullMetrics()
  157. }
  158. if options.queueSize <= 0 {
  159. options.queueSize = defaultQueueSize
  160. }
  161. reporter := &remoteReporter{
  162. reporterOptions: options,
  163. sender: sender,
  164. flushSignal: make(chan *sync.WaitGroup),
  165. queue: make(chan *Span, options.queueSize),
  166. }
  167. go reporter.processQueue()
  168. return reporter
  169. }
  170. // Report implements Report() method of Reporter.
  171. // It passes the span to a background go-routine for submission to Jaeger.
  172. func (r *remoteReporter) Report(span *Span) {
  173. select {
  174. case r.queue <- span:
  175. atomic.AddInt64(&r.queueLength, 1)
  176. default:
  177. r.metrics.ReporterDropped.Inc(1)
  178. }
  179. }
  180. // Close implements Close() method of Reporter by waiting for the queue to be drained.
  181. func (r *remoteReporter) Close() {
  182. r.queueDrained.Add(1)
  183. close(r.queue)
  184. r.queueDrained.Wait()
  185. r.sender.Close()
  186. }
  187. // processQueue reads spans from the queue, converts them to Thrift, and stores them in an internal buffer.
  188. // When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger.
  189. // Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the tracer stopped
  190. // reporting new spans.
  191. func (r *remoteReporter) processQueue() {
  192. timer := time.NewTicker(r.bufferFlushInterval)
  193. for {
  194. select {
  195. case span, ok := <-r.queue:
  196. if ok {
  197. atomic.AddInt64(&r.queueLength, -1)
  198. if flushed, err := r.sender.Append(span); err != nil {
  199. r.metrics.ReporterFailure.Inc(int64(flushed))
  200. r.logger.Error(err.Error())
  201. } else if flushed > 0 {
  202. r.metrics.ReporterSuccess.Inc(int64(flushed))
  203. // to reduce the number of gauge stats, we only emit queue length on flush
  204. r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
  205. }
  206. } else {
  207. // queue closed
  208. timer.Stop()
  209. r.flush()
  210. r.queueDrained.Done()
  211. return
  212. }
  213. case <-timer.C:
  214. r.flush()
  215. case wg := <-r.flushSignal: // for testing
  216. r.flush()
  217. wg.Done()
  218. }
  219. }
  220. }
  221. // flush causes the Sender to flush its accumulated spans and clear the buffer
  222. func (r *remoteReporter) flush() {
  223. if flushed, err := r.sender.Flush(); err != nil {
  224. r.metrics.ReporterFailure.Inc(int64(flushed))
  225. r.logger.Error(err.Error())
  226. } else if flushed > 0 {
  227. r.metrics.ReporterSuccess.Inc(int64(flushed))
  228. }
  229. }