zipkin_thrift_span.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. // Copyright (c) 2016 Uber Technologies, Inc.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a copy
  4. // of this software and associated documentation files (the "Software"), to deal
  5. // in the Software without restriction, including without limitation the rights
  6. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. // copies of the Software, and to permit persons to whom the Software is
  8. // furnished to do so, subject to the following conditions:
  9. //
  10. // The above copyright notice and this permission notice shall be included in
  11. // all copies or substantial portions of the Software.
  12. //
  13. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19. // THE SOFTWARE.
  20. package jaeger
  21. import (
  22. "encoding/binary"
  23. "fmt"
  24. "time"
  25. "github.com/opentracing/opentracing-go/ext"
  26. "github.com/uber/jaeger-client-go/internal/spanlog"
  27. z "github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
  28. "github.com/uber/jaeger-client-go/utils"
  29. )
  30. const (
  31. // maxAnnotationLength is the max length of byte array or string allowed in the annotations
  32. maxAnnotationLength = 256
  33. // Zipkin UI does not work well with non-string tag values
  34. allowPackedNumbers = false
  35. )
  36. var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){
  37. string(ext.SpanKind): setSpanKind,
  38. string(ext.PeerHostIPv4): setPeerIPv4,
  39. string(ext.PeerPort): setPeerPort,
  40. string(ext.PeerService): setPeerService,
  41. TracerIPTagKey: removeTag,
  42. }
  43. // BuildZipkinThrift builds thrift span based on internal span.
  44. func BuildZipkinThrift(s *Span) *z.Span {
  45. span := &zipkinSpan{Span: s}
  46. span.handleSpecialTags()
  47. parentID := int64(span.context.parentID)
  48. var ptrParentID *int64
  49. if parentID != 0 {
  50. ptrParentID = &parentID
  51. }
  52. timestamp := utils.TimeToMicrosecondsSinceEpochInt64(span.startTime)
  53. duration := span.duration.Nanoseconds() / int64(time.Microsecond)
  54. endpoint := &z.Endpoint{
  55. ServiceName: span.tracer.serviceName,
  56. Ipv4: int32(span.tracer.hostIPv4)}
  57. thriftSpan := &z.Span{
  58. TraceID: int64(span.context.traceID.Low), // TODO upgrade zipkin thrift and use TraceIdHigh
  59. ID: int64(span.context.spanID),
  60. ParentID: ptrParentID,
  61. Name: span.operationName,
  62. Timestamp: &timestamp,
  63. Duration: &duration,
  64. Debug: span.context.IsDebug(),
  65. Annotations: buildAnnotations(span, endpoint),
  66. BinaryAnnotations: buildBinaryAnnotations(span, endpoint)}
  67. return thriftSpan
  68. }
  69. func buildAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.Annotation {
  70. // automatically adding 2 Zipkin CoreAnnotations
  71. annotations := make([]*z.Annotation, 0, 2+len(span.logs))
  72. var startLabel, endLabel string
  73. if span.spanKind == string(ext.SpanKindRPCClientEnum) {
  74. startLabel, endLabel = z.CLIENT_SEND, z.CLIENT_RECV
  75. } else if span.spanKind == string(ext.SpanKindRPCServerEnum) {
  76. startLabel, endLabel = z.SERVER_RECV, z.SERVER_SEND
  77. }
  78. if !span.startTime.IsZero() && startLabel != "" {
  79. start := &z.Annotation{
  80. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(span.startTime),
  81. Value: startLabel,
  82. Host: endpoint}
  83. annotations = append(annotations, start)
  84. if span.duration != 0 {
  85. endTs := span.startTime.Add(span.duration)
  86. end := &z.Annotation{
  87. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(endTs),
  88. Value: endLabel,
  89. Host: endpoint}
  90. annotations = append(annotations, end)
  91. }
  92. }
  93. for _, log := range span.logs {
  94. anno := &z.Annotation{
  95. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(log.Timestamp),
  96. Host: endpoint}
  97. if content, err := spanlog.MaterializeWithJSON(log.Fields); err == nil {
  98. anno.Value = truncateString(string(content))
  99. } else {
  100. anno.Value = err.Error()
  101. }
  102. annotations = append(annotations, anno)
  103. }
  104. return annotations
  105. }
  106. func buildBinaryAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.BinaryAnnotation {
  107. // automatically adding local component or server/client address tag, and client version
  108. annotations := make([]*z.BinaryAnnotation, 0, 2+len(span.tags))
  109. if span.peerDefined() && span.isRPC() {
  110. peer := z.Endpoint{
  111. Ipv4: span.peer.Ipv4,
  112. Port: span.peer.Port,
  113. ServiceName: span.peer.ServiceName}
  114. label := z.CLIENT_ADDR
  115. if span.isRPCClient() {
  116. label = z.SERVER_ADDR
  117. }
  118. anno := &z.BinaryAnnotation{
  119. Key: label,
  120. Value: []byte{1},
  121. AnnotationType: z.AnnotationType_BOOL,
  122. Host: &peer}
  123. annotations = append(annotations, anno)
  124. }
  125. if !span.isRPC() {
  126. componentName := endpoint.ServiceName
  127. for _, tag := range span.tags {
  128. if tag.key == string(ext.Component) {
  129. componentName = stringify(tag.value)
  130. break
  131. }
  132. }
  133. local := &z.BinaryAnnotation{
  134. Key: z.LOCAL_COMPONENT,
  135. Value: []byte(componentName),
  136. AnnotationType: z.AnnotationType_STRING,
  137. Host: endpoint}
  138. annotations = append(annotations, local)
  139. }
  140. for _, tag := range span.tags {
  141. // "Special tags" are already handled by this point, we'd be double reporting the
  142. // tags if we don't skip here
  143. if _, ok := specialTagHandlers[tag.key]; ok {
  144. continue
  145. }
  146. if anno := buildBinaryAnnotation(tag.key, tag.value, nil); anno != nil {
  147. annotations = append(annotations, anno)
  148. }
  149. }
  150. return annotations
  151. }
  152. func buildBinaryAnnotation(key string, val interface{}, endpoint *z.Endpoint) *z.BinaryAnnotation {
  153. bann := &z.BinaryAnnotation{Key: key, Host: endpoint}
  154. if value, ok := val.(string); ok {
  155. bann.Value = []byte(truncateString(value))
  156. bann.AnnotationType = z.AnnotationType_STRING
  157. } else if value, ok := val.([]byte); ok {
  158. if len(value) > maxAnnotationLength {
  159. value = value[:maxAnnotationLength]
  160. }
  161. bann.Value = value
  162. bann.AnnotationType = z.AnnotationType_BYTES
  163. } else if value, ok := val.(int32); ok && allowPackedNumbers {
  164. bann.Value = int32ToBytes(value)
  165. bann.AnnotationType = z.AnnotationType_I32
  166. } else if value, ok := val.(int64); ok && allowPackedNumbers {
  167. bann.Value = int64ToBytes(value)
  168. bann.AnnotationType = z.AnnotationType_I64
  169. } else if value, ok := val.(int); ok && allowPackedNumbers {
  170. bann.Value = int64ToBytes(int64(value))
  171. bann.AnnotationType = z.AnnotationType_I64
  172. } else if value, ok := val.(bool); ok {
  173. bann.Value = []byte{boolToByte(value)}
  174. bann.AnnotationType = z.AnnotationType_BOOL
  175. } else {
  176. value := stringify(val)
  177. bann.Value = []byte(truncateString(value))
  178. bann.AnnotationType = z.AnnotationType_STRING
  179. }
  180. return bann
  181. }
  182. func stringify(value interface{}) string {
  183. if s, ok := value.(string); ok {
  184. return s
  185. }
  186. return fmt.Sprintf("%+v", value)
  187. }
  188. func truncateString(value string) string {
  189. // we ignore the problem of utf8 runes possibly being sliced in the middle,
  190. // as it is rather expensive to iterate through each tag just to find rune
  191. // boundaries.
  192. if len(value) > maxAnnotationLength {
  193. return value[:maxAnnotationLength]
  194. }
  195. return value
  196. }
  197. func boolToByte(b bool) byte {
  198. if b {
  199. return 1
  200. }
  201. return 0
  202. }
  203. // int32ToBytes converts int32 to bytes.
  204. func int32ToBytes(i int32) []byte {
  205. buf := make([]byte, 4)
  206. binary.BigEndian.PutUint32(buf, uint32(i))
  207. return buf
  208. }
  209. // int64ToBytes converts int64 to bytes.
  210. func int64ToBytes(i int64) []byte {
  211. buf := make([]byte, 8)
  212. binary.BigEndian.PutUint64(buf, uint64(i))
  213. return buf
  214. }
  215. type zipkinSpan struct {
  216. *Span
  217. // peer points to the peer service participating in this span,
  218. // e.g. the Client if this span is a server span,
  219. // or Server if this span is a client span
  220. peer struct {
  221. Ipv4 int32
  222. Port int16
  223. ServiceName string
  224. }
  225. // used to distinguish local vs. RPC Server vs. RPC Client spans
  226. spanKind string
  227. }
  228. func (s *zipkinSpan) handleSpecialTags() {
  229. s.Lock()
  230. defer s.Unlock()
  231. if s.firstInProcess {
  232. // append the process tags
  233. s.tags = append(s.tags, s.tracer.tags...)
  234. }
  235. filteredTags := make([]Tag, 0, len(s.tags))
  236. for _, tag := range s.tags {
  237. if handler, ok := specialTagHandlers[tag.key]; ok {
  238. handler(s, tag.value)
  239. } else {
  240. filteredTags = append(filteredTags, tag)
  241. }
  242. }
  243. s.tags = filteredTags
  244. }
  245. func setSpanKind(s *zipkinSpan, value interface{}) {
  246. if val, ok := value.(string); ok {
  247. s.spanKind = val
  248. return
  249. }
  250. if val, ok := value.(ext.SpanKindEnum); ok {
  251. s.spanKind = string(val)
  252. }
  253. }
  254. func setPeerIPv4(s *zipkinSpan, value interface{}) {
  255. if val, ok := value.(string); ok {
  256. if ip, err := utils.ParseIPToUint32(val); err == nil {
  257. s.peer.Ipv4 = int32(ip)
  258. return
  259. }
  260. }
  261. if val, ok := value.(uint32); ok {
  262. s.peer.Ipv4 = int32(val)
  263. return
  264. }
  265. if val, ok := value.(int32); ok {
  266. s.peer.Ipv4 = val
  267. }
  268. }
  269. func setPeerPort(s *zipkinSpan, value interface{}) {
  270. if val, ok := value.(string); ok {
  271. if port, err := utils.ParsePort(val); err == nil {
  272. s.peer.Port = int16(port)
  273. return
  274. }
  275. }
  276. if val, ok := value.(uint16); ok {
  277. s.peer.Port = int16(val)
  278. return
  279. }
  280. if val, ok := value.(int); ok {
  281. s.peer.Port = int16(val)
  282. }
  283. }
  284. func setPeerService(s *zipkinSpan, value interface{}) {
  285. if val, ok := value.(string); ok {
  286. s.peer.ServiceName = val
  287. }
  288. }
  289. func removeTag(s *zipkinSpan, value interface{}) {}
  290. func (s *zipkinSpan) peerDefined() bool {
  291. return s.peer.ServiceName != "" || s.peer.Ipv4 != 0 || s.peer.Port != 0
  292. }
  293. func (s *zipkinSpan) isRPC() bool {
  294. s.RLock()
  295. defer s.RUnlock()
  296. return s.spanKind == string(ext.SpanKindRPCClientEnum) || s.spanKind == string(ext.SpanKindRPCServerEnum)
  297. }
  298. func (s *zipkinSpan) isRPCClient() bool {
  299. s.RLock()
  300. defer s.RUnlock()
  301. return s.spanKind == string(ext.SpanKindRPCClientEnum)
  302. }