zipkin_thrift_span.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "encoding/binary"
  17. "fmt"
  18. "time"
  19. "github.com/opentracing/opentracing-go/ext"
  20. "github.com/uber/jaeger-client-go/internal/spanlog"
  21. z "github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
  22. "github.com/uber/jaeger-client-go/utils"
  23. )
  24. const (
  25. // Zipkin UI does not work well with non-string tag values
  26. allowPackedNumbers = false
  27. )
  28. var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){
  29. string(ext.SpanKind): setSpanKind,
  30. string(ext.PeerHostIPv4): setPeerIPv4,
  31. string(ext.PeerPort): setPeerPort,
  32. string(ext.PeerService): setPeerService,
  33. TracerIPTagKey: removeTag,
  34. }
  35. // BuildZipkinThrift builds thrift span based on internal span.
  36. func BuildZipkinThrift(s *Span) *z.Span {
  37. span := &zipkinSpan{Span: s}
  38. span.handleSpecialTags()
  39. parentID := int64(span.context.parentID)
  40. var ptrParentID *int64
  41. if parentID != 0 {
  42. ptrParentID = &parentID
  43. }
  44. traceIDHigh := int64(span.context.traceID.High)
  45. var ptrTraceIDHigh *int64
  46. if traceIDHigh != 0 {
  47. ptrTraceIDHigh = &traceIDHigh
  48. }
  49. timestamp := utils.TimeToMicrosecondsSinceEpochInt64(span.startTime)
  50. duration := span.duration.Nanoseconds() / int64(time.Microsecond)
  51. endpoint := &z.Endpoint{
  52. ServiceName: span.tracer.serviceName,
  53. Ipv4: int32(span.tracer.hostIPv4)}
  54. thriftSpan := &z.Span{
  55. TraceID: int64(span.context.traceID.Low),
  56. TraceIDHigh: ptrTraceIDHigh,
  57. ID: int64(span.context.spanID),
  58. ParentID: ptrParentID,
  59. Name: span.operationName,
  60. Timestamp: &timestamp,
  61. Duration: &duration,
  62. Debug: span.context.IsDebug(),
  63. Annotations: buildAnnotations(span, endpoint),
  64. BinaryAnnotations: buildBinaryAnnotations(span, endpoint)}
  65. return thriftSpan
  66. }
  67. func buildAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.Annotation {
  68. // automatically adding 2 Zipkin CoreAnnotations
  69. annotations := make([]*z.Annotation, 0, 2+len(span.logs))
  70. var startLabel, endLabel string
  71. if span.spanKind == string(ext.SpanKindRPCClientEnum) {
  72. startLabel, endLabel = z.CLIENT_SEND, z.CLIENT_RECV
  73. } else if span.spanKind == string(ext.SpanKindRPCServerEnum) {
  74. startLabel, endLabel = z.SERVER_RECV, z.SERVER_SEND
  75. }
  76. if !span.startTime.IsZero() && startLabel != "" {
  77. start := &z.Annotation{
  78. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(span.startTime),
  79. Value: startLabel,
  80. Host: endpoint}
  81. annotations = append(annotations, start)
  82. if span.duration != 0 {
  83. endTs := span.startTime.Add(span.duration)
  84. end := &z.Annotation{
  85. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(endTs),
  86. Value: endLabel,
  87. Host: endpoint}
  88. annotations = append(annotations, end)
  89. }
  90. }
  91. for _, log := range span.logs {
  92. anno := &z.Annotation{
  93. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(log.Timestamp),
  94. Host: endpoint}
  95. if content, err := spanlog.MaterializeWithJSON(log.Fields); err == nil {
  96. anno.Value = truncateString(string(content), span.tracer.options.maxTagValueLength)
  97. } else {
  98. anno.Value = err.Error()
  99. }
  100. annotations = append(annotations, anno)
  101. }
  102. return annotations
  103. }
  104. func buildBinaryAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.BinaryAnnotation {
  105. // automatically adding local component or server/client address tag, and client version
  106. annotations := make([]*z.BinaryAnnotation, 0, 2+len(span.tags))
  107. if span.peerDefined() && span.isRPC() {
  108. peer := z.Endpoint{
  109. Ipv4: span.peer.Ipv4,
  110. Port: span.peer.Port,
  111. ServiceName: span.peer.ServiceName}
  112. label := z.CLIENT_ADDR
  113. if span.isRPCClient() {
  114. label = z.SERVER_ADDR
  115. }
  116. anno := &z.BinaryAnnotation{
  117. Key: label,
  118. Value: []byte{1},
  119. AnnotationType: z.AnnotationType_BOOL,
  120. Host: &peer}
  121. annotations = append(annotations, anno)
  122. }
  123. if !span.isRPC() {
  124. componentName := endpoint.ServiceName
  125. for _, tag := range span.tags {
  126. if tag.key == string(ext.Component) {
  127. componentName = stringify(tag.value)
  128. break
  129. }
  130. }
  131. local := &z.BinaryAnnotation{
  132. Key: z.LOCAL_COMPONENT,
  133. Value: []byte(componentName),
  134. AnnotationType: z.AnnotationType_STRING,
  135. Host: endpoint}
  136. annotations = append(annotations, local)
  137. }
  138. for _, tag := range span.tags {
  139. // "Special tags" are already handled by this point, we'd be double reporting the
  140. // tags if we don't skip here
  141. if _, ok := specialTagHandlers[tag.key]; ok {
  142. continue
  143. }
  144. if anno := buildBinaryAnnotation(tag.key, tag.value, span.tracer.options.maxTagValueLength, nil); anno != nil {
  145. annotations = append(annotations, anno)
  146. }
  147. }
  148. return annotations
  149. }
  150. func buildBinaryAnnotation(key string, val interface{}, maxTagValueLength int, endpoint *z.Endpoint) *z.BinaryAnnotation {
  151. bann := &z.BinaryAnnotation{Key: key, Host: endpoint}
  152. if value, ok := val.(string); ok {
  153. bann.Value = []byte(truncateString(value, maxTagValueLength))
  154. bann.AnnotationType = z.AnnotationType_STRING
  155. } else if value, ok := val.([]byte); ok {
  156. if len(value) > maxTagValueLength {
  157. value = value[:maxTagValueLength]
  158. }
  159. bann.Value = value
  160. bann.AnnotationType = z.AnnotationType_BYTES
  161. } else if value, ok := val.(int32); ok && allowPackedNumbers {
  162. bann.Value = int32ToBytes(value)
  163. bann.AnnotationType = z.AnnotationType_I32
  164. } else if value, ok := val.(int64); ok && allowPackedNumbers {
  165. bann.Value = int64ToBytes(value)
  166. bann.AnnotationType = z.AnnotationType_I64
  167. } else if value, ok := val.(int); ok && allowPackedNumbers {
  168. bann.Value = int64ToBytes(int64(value))
  169. bann.AnnotationType = z.AnnotationType_I64
  170. } else if value, ok := val.(bool); ok {
  171. bann.Value = []byte{boolToByte(value)}
  172. bann.AnnotationType = z.AnnotationType_BOOL
  173. } else {
  174. value := stringify(val)
  175. bann.Value = []byte(truncateString(value, maxTagValueLength))
  176. bann.AnnotationType = z.AnnotationType_STRING
  177. }
  178. return bann
  179. }
  180. func stringify(value interface{}) string {
  181. if s, ok := value.(string); ok {
  182. return s
  183. }
  184. return fmt.Sprintf("%+v", value)
  185. }
  186. func truncateString(value string, maxLength int) string {
  187. // we ignore the problem of utf8 runes possibly being sliced in the middle,
  188. // as it is rather expensive to iterate through each tag just to find rune
  189. // boundaries.
  190. if len(value) > maxLength {
  191. return value[:maxLength]
  192. }
  193. return value
  194. }
  195. func boolToByte(b bool) byte {
  196. if b {
  197. return 1
  198. }
  199. return 0
  200. }
  201. // int32ToBytes converts int32 to bytes.
  202. func int32ToBytes(i int32) []byte {
  203. buf := make([]byte, 4)
  204. binary.BigEndian.PutUint32(buf, uint32(i))
  205. return buf
  206. }
  207. // int64ToBytes converts int64 to bytes.
  208. func int64ToBytes(i int64) []byte {
  209. buf := make([]byte, 8)
  210. binary.BigEndian.PutUint64(buf, uint64(i))
  211. return buf
  212. }
  213. type zipkinSpan struct {
  214. *Span
  215. // peer points to the peer service participating in this span,
  216. // e.g. the Client if this span is a server span,
  217. // or Server if this span is a client span
  218. peer struct {
  219. Ipv4 int32
  220. Port int16
  221. ServiceName string
  222. }
  223. // used to distinguish local vs. RPC Server vs. RPC Client spans
  224. spanKind string
  225. }
  226. func (s *zipkinSpan) handleSpecialTags() {
  227. s.Lock()
  228. defer s.Unlock()
  229. if s.firstInProcess {
  230. // append the process tags
  231. s.tags = append(s.tags, s.tracer.tags...)
  232. }
  233. filteredTags := make([]Tag, 0, len(s.tags))
  234. for _, tag := range s.tags {
  235. if handler, ok := specialTagHandlers[tag.key]; ok {
  236. handler(s, tag.value)
  237. } else {
  238. filteredTags = append(filteredTags, tag)
  239. }
  240. }
  241. s.tags = filteredTags
  242. }
  243. func setSpanKind(s *zipkinSpan, value interface{}) {
  244. if val, ok := value.(string); ok {
  245. s.spanKind = val
  246. return
  247. }
  248. if val, ok := value.(ext.SpanKindEnum); ok {
  249. s.spanKind = string(val)
  250. }
  251. }
  252. func setPeerIPv4(s *zipkinSpan, value interface{}) {
  253. if val, ok := value.(string); ok {
  254. if ip, err := utils.ParseIPToUint32(val); err == nil {
  255. s.peer.Ipv4 = int32(ip)
  256. return
  257. }
  258. }
  259. if val, ok := value.(uint32); ok {
  260. s.peer.Ipv4 = int32(val)
  261. return
  262. }
  263. if val, ok := value.(int32); ok {
  264. s.peer.Ipv4 = val
  265. }
  266. }
  267. func setPeerPort(s *zipkinSpan, value interface{}) {
  268. if val, ok := value.(string); ok {
  269. if port, err := utils.ParsePort(val); err == nil {
  270. s.peer.Port = int16(port)
  271. return
  272. }
  273. }
  274. if val, ok := value.(uint16); ok {
  275. s.peer.Port = int16(val)
  276. return
  277. }
  278. if val, ok := value.(int); ok {
  279. s.peer.Port = int16(val)
  280. }
  281. }
  282. func setPeerService(s *zipkinSpan, value interface{}) {
  283. if val, ok := value.(string); ok {
  284. s.peer.ServiceName = val
  285. }
  286. }
  287. func removeTag(s *zipkinSpan, value interface{}) {}
  288. func (s *zipkinSpan) peerDefined() bool {
  289. return s.peer.ServiceName != "" || s.peer.Ipv4 != 0 || s.peer.Port != 0
  290. }
  291. func (s *zipkinSpan) isRPC() bool {
  292. s.RLock()
  293. defer s.RUnlock()
  294. return s.spanKind == string(ext.SpanKindRPCClientEnum) || s.spanKind == string(ext.SpanKindRPCServerEnum)
  295. }
  296. func (s *zipkinSpan) isRPCClient() bool {
  297. s.RLock()
  298. defer s.RUnlock()
  299. return s.spanKind == string(ext.SpanKindRPCClientEnum)
  300. }