observer.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a copy
  4. // of this software and associated documentation files (the "Software"), to deal
  5. // in the Software without restriction, including without limitation the rights
  6. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. // copies of the Software, and to permit persons to whom the Software is
  8. // furnished to do so, subject to the following conditions:
  9. //
  10. // The above copyright notice and this permission notice shall be included in
  11. // all copies or substantial portions of the Software.
  12. //
  13. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19. // THE SOFTWARE.
  20. package rpcmetrics
  21. import (
  22. "strconv"
  23. "sync"
  24. "time"
  25. "github.com/opentracing/opentracing-go"
  26. "github.com/opentracing/opentracing-go/ext"
  27. "github.com/uber/jaeger-lib/metrics"
  28. jaeger "github.com/uber/jaeger-client-go"
  29. )
  30. const defaultMaxNumberOfEndpoints = 200
  31. // Observer is an observer that can emit RPC metrics.
  32. type Observer struct {
  33. metricsByEndpoint *MetricsByEndpoint
  34. }
  35. // NewObserver creates a new observer that can emit RPC metrics.
  36. func NewObserver(metricsFactory metrics.Factory, normalizer NameNormalizer) *Observer {
  37. return &Observer{
  38. metricsByEndpoint: newMetricsByEndpoint(
  39. metricsFactory,
  40. normalizer,
  41. defaultMaxNumberOfEndpoints,
  42. ),
  43. }
  44. }
  45. // OnStartSpan creates a new Observer for the span.
  46. func (o *Observer) OnStartSpan(
  47. operationName string,
  48. options opentracing.StartSpanOptions,
  49. ) jaeger.SpanObserver {
  50. return NewSpanObserver(o.metricsByEndpoint, operationName, options)
  51. }
  52. // SpanKind identifies the span as inboud, outbound, or internal
  53. type SpanKind int
  54. const (
  55. // Local span kind
  56. Local SpanKind = iota
  57. // Inbound span kind
  58. Inbound
  59. // Outbound span kind
  60. Outbound
  61. )
  62. // SpanObserver collects RPC metrics
  63. type SpanObserver struct {
  64. metricsByEndpoint *MetricsByEndpoint
  65. operationName string
  66. startTime time.Time
  67. mux sync.Mutex
  68. kind SpanKind
  69. httpStatusCode uint16
  70. err bool
  71. }
  72. // NewSpanObserver creates a new SpanObserver that can emit RPC metrics.
  73. func NewSpanObserver(
  74. metricsByEndpoint *MetricsByEndpoint,
  75. operationName string,
  76. options opentracing.StartSpanOptions,
  77. ) *SpanObserver {
  78. so := &SpanObserver{
  79. metricsByEndpoint: metricsByEndpoint,
  80. operationName: operationName,
  81. startTime: options.StartTime,
  82. }
  83. for k, v := range options.Tags {
  84. so.handleTagInLock(k, v)
  85. }
  86. return so
  87. }
  88. // handleTags watches for special tags
  89. // - SpanKind
  90. // - HttpStatusCode
  91. // - Error
  92. func (so *SpanObserver) handleTagInLock(key string, value interface{}) {
  93. if key == string(ext.SpanKind) {
  94. if v, ok := value.(ext.SpanKindEnum); ok {
  95. value = string(v)
  96. }
  97. if v, ok := value.(string); ok {
  98. if v == string(ext.SpanKindRPCClientEnum) {
  99. so.kind = Outbound
  100. } else if v == string(ext.SpanKindRPCServerEnum) {
  101. so.kind = Inbound
  102. }
  103. }
  104. return
  105. }
  106. if key == string(ext.HTTPStatusCode) {
  107. if v, ok := value.(uint16); ok {
  108. so.httpStatusCode = v
  109. } else if v, ok := value.(int); ok {
  110. so.httpStatusCode = uint16(v)
  111. } else if v, ok := value.(string); ok {
  112. if vv, err := strconv.Atoi(v); err == nil {
  113. so.httpStatusCode = uint16(vv)
  114. }
  115. }
  116. return
  117. }
  118. if key == string(ext.Error) {
  119. if v, ok := value.(bool); ok {
  120. so.err = v
  121. } else if v, ok := value.(string); ok {
  122. if vv, err := strconv.ParseBool(v); err == nil {
  123. so.err = vv
  124. }
  125. }
  126. return
  127. }
  128. }
  129. // OnFinish emits the RPC metrics. It only has an effect when operation name
  130. // is not blank, and the span kind is an RPC server.
  131. func (so *SpanObserver) OnFinish(options opentracing.FinishOptions) {
  132. so.mux.Lock()
  133. defer so.mux.Unlock()
  134. if so.operationName == "" || so.kind != Inbound {
  135. return
  136. }
  137. mets := so.metricsByEndpoint.get(so.operationName)
  138. latency := options.FinishTime.Sub(so.startTime)
  139. if so.err {
  140. mets.RequestCountFailures.Inc(1)
  141. mets.RequestLatencyFailures.Record(latency)
  142. } else {
  143. mets.RequestCountSuccess.Inc(1)
  144. mets.RequestLatencySuccess.Record(latency)
  145. }
  146. mets.recordHTTPStatusCode(so.httpStatusCode)
  147. }
  148. // OnSetOperationName records new operation name.
  149. func (so *SpanObserver) OnSetOperationName(operationName string) {
  150. so.mux.Lock()
  151. so.operationName = operationName
  152. so.mux.Unlock()
  153. }
  154. // OnSetTag implements SpanObserver
  155. func (so *SpanObserver) OnSetTag(key string, value interface{}) {
  156. so.mux.Lock()
  157. so.handleTagInLock(key, value)
  158. so.mux.Unlock()
  159. }