zipkin_thrift_span.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "encoding/binary"
  17. "fmt"
  18. "time"
  19. "github.com/opentracing/opentracing-go/ext"
  20. "github.com/uber/jaeger-client-go/internal/spanlog"
  21. z "github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
  22. "github.com/uber/jaeger-client-go/utils"
  23. )
  24. const (
  25. // maxAnnotationLength is the max length of byte array or string allowed in the annotations
  26. maxAnnotationLength = 256
  27. // Zipkin UI does not work well with non-string tag values
  28. allowPackedNumbers = false
  29. )
  30. var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){
  31. string(ext.SpanKind): setSpanKind,
  32. string(ext.PeerHostIPv4): setPeerIPv4,
  33. string(ext.PeerPort): setPeerPort,
  34. string(ext.PeerService): setPeerService,
  35. TracerIPTagKey: removeTag,
  36. }
  37. // BuildZipkinThrift builds thrift span based on internal span.
  38. func BuildZipkinThrift(s *Span) *z.Span {
  39. span := &zipkinSpan{Span: s}
  40. span.handleSpecialTags()
  41. parentID := int64(span.context.parentID)
  42. var ptrParentID *int64
  43. if parentID != 0 {
  44. ptrParentID = &parentID
  45. }
  46. timestamp := utils.TimeToMicrosecondsSinceEpochInt64(span.startTime)
  47. duration := span.duration.Nanoseconds() / int64(time.Microsecond)
  48. endpoint := &z.Endpoint{
  49. ServiceName: span.tracer.serviceName,
  50. Ipv4: int32(span.tracer.hostIPv4)}
  51. thriftSpan := &z.Span{
  52. TraceID: int64(span.context.traceID.Low), // TODO upgrade zipkin thrift and use TraceIdHigh
  53. ID: int64(span.context.spanID),
  54. ParentID: ptrParentID,
  55. Name: span.operationName,
  56. Timestamp: &timestamp,
  57. Duration: &duration,
  58. Debug: span.context.IsDebug(),
  59. Annotations: buildAnnotations(span, endpoint),
  60. BinaryAnnotations: buildBinaryAnnotations(span, endpoint)}
  61. return thriftSpan
  62. }
  63. func buildAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.Annotation {
  64. // automatically adding 2 Zipkin CoreAnnotations
  65. annotations := make([]*z.Annotation, 0, 2+len(span.logs))
  66. var startLabel, endLabel string
  67. if span.spanKind == string(ext.SpanKindRPCClientEnum) {
  68. startLabel, endLabel = z.CLIENT_SEND, z.CLIENT_RECV
  69. } else if span.spanKind == string(ext.SpanKindRPCServerEnum) {
  70. startLabel, endLabel = z.SERVER_RECV, z.SERVER_SEND
  71. }
  72. if !span.startTime.IsZero() && startLabel != "" {
  73. start := &z.Annotation{
  74. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(span.startTime),
  75. Value: startLabel,
  76. Host: endpoint}
  77. annotations = append(annotations, start)
  78. if span.duration != 0 {
  79. endTs := span.startTime.Add(span.duration)
  80. end := &z.Annotation{
  81. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(endTs),
  82. Value: endLabel,
  83. Host: endpoint}
  84. annotations = append(annotations, end)
  85. }
  86. }
  87. for _, log := range span.logs {
  88. anno := &z.Annotation{
  89. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(log.Timestamp),
  90. Host: endpoint}
  91. if content, err := spanlog.MaterializeWithJSON(log.Fields); err == nil {
  92. anno.Value = truncateString(string(content))
  93. } else {
  94. anno.Value = err.Error()
  95. }
  96. annotations = append(annotations, anno)
  97. }
  98. return annotations
  99. }
  100. func buildBinaryAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.BinaryAnnotation {
  101. // automatically adding local component or server/client address tag, and client version
  102. annotations := make([]*z.BinaryAnnotation, 0, 2+len(span.tags))
  103. if span.peerDefined() && span.isRPC() {
  104. peer := z.Endpoint{
  105. Ipv4: span.peer.Ipv4,
  106. Port: span.peer.Port,
  107. ServiceName: span.peer.ServiceName}
  108. label := z.CLIENT_ADDR
  109. if span.isRPCClient() {
  110. label = z.SERVER_ADDR
  111. }
  112. anno := &z.BinaryAnnotation{
  113. Key: label,
  114. Value: []byte{1},
  115. AnnotationType: z.AnnotationType_BOOL,
  116. Host: &peer}
  117. annotations = append(annotations, anno)
  118. }
  119. if !span.isRPC() {
  120. componentName := endpoint.ServiceName
  121. for _, tag := range span.tags {
  122. if tag.key == string(ext.Component) {
  123. componentName = stringify(tag.value)
  124. break
  125. }
  126. }
  127. local := &z.BinaryAnnotation{
  128. Key: z.LOCAL_COMPONENT,
  129. Value: []byte(componentName),
  130. AnnotationType: z.AnnotationType_STRING,
  131. Host: endpoint}
  132. annotations = append(annotations, local)
  133. }
  134. for _, tag := range span.tags {
  135. // "Special tags" are already handled by this point, we'd be double reporting the
  136. // tags if we don't skip here
  137. if _, ok := specialTagHandlers[tag.key]; ok {
  138. continue
  139. }
  140. if anno := buildBinaryAnnotation(tag.key, tag.value, nil); anno != nil {
  141. annotations = append(annotations, anno)
  142. }
  143. }
  144. return annotations
  145. }
  146. func buildBinaryAnnotation(key string, val interface{}, endpoint *z.Endpoint) *z.BinaryAnnotation {
  147. bann := &z.BinaryAnnotation{Key: key, Host: endpoint}
  148. if value, ok := val.(string); ok {
  149. bann.Value = []byte(truncateString(value))
  150. bann.AnnotationType = z.AnnotationType_STRING
  151. } else if value, ok := val.([]byte); ok {
  152. if len(value) > maxAnnotationLength {
  153. value = value[:maxAnnotationLength]
  154. }
  155. bann.Value = value
  156. bann.AnnotationType = z.AnnotationType_BYTES
  157. } else if value, ok := val.(int32); ok && allowPackedNumbers {
  158. bann.Value = int32ToBytes(value)
  159. bann.AnnotationType = z.AnnotationType_I32
  160. } else if value, ok := val.(int64); ok && allowPackedNumbers {
  161. bann.Value = int64ToBytes(value)
  162. bann.AnnotationType = z.AnnotationType_I64
  163. } else if value, ok := val.(int); ok && allowPackedNumbers {
  164. bann.Value = int64ToBytes(int64(value))
  165. bann.AnnotationType = z.AnnotationType_I64
  166. } else if value, ok := val.(bool); ok {
  167. bann.Value = []byte{boolToByte(value)}
  168. bann.AnnotationType = z.AnnotationType_BOOL
  169. } else {
  170. value := stringify(val)
  171. bann.Value = []byte(truncateString(value))
  172. bann.AnnotationType = z.AnnotationType_STRING
  173. }
  174. return bann
  175. }
  176. func stringify(value interface{}) string {
  177. if s, ok := value.(string); ok {
  178. return s
  179. }
  180. return fmt.Sprintf("%+v", value)
  181. }
  182. func truncateString(value string) string {
  183. // we ignore the problem of utf8 runes possibly being sliced in the middle,
  184. // as it is rather expensive to iterate through each tag just to find rune
  185. // boundaries.
  186. if len(value) > maxAnnotationLength {
  187. return value[:maxAnnotationLength]
  188. }
  189. return value
  190. }
  191. func boolToByte(b bool) byte {
  192. if b {
  193. return 1
  194. }
  195. return 0
  196. }
  197. // int32ToBytes converts int32 to bytes.
  198. func int32ToBytes(i int32) []byte {
  199. buf := make([]byte, 4)
  200. binary.BigEndian.PutUint32(buf, uint32(i))
  201. return buf
  202. }
  203. // int64ToBytes converts int64 to bytes.
  204. func int64ToBytes(i int64) []byte {
  205. buf := make([]byte, 8)
  206. binary.BigEndian.PutUint64(buf, uint64(i))
  207. return buf
  208. }
  209. type zipkinSpan struct {
  210. *Span
  211. // peer points to the peer service participating in this span,
  212. // e.g. the Client if this span is a server span,
  213. // or Server if this span is a client span
  214. peer struct {
  215. Ipv4 int32
  216. Port int16
  217. ServiceName string
  218. }
  219. // used to distinguish local vs. RPC Server vs. RPC Client spans
  220. spanKind string
  221. }
  222. func (s *zipkinSpan) handleSpecialTags() {
  223. s.Lock()
  224. defer s.Unlock()
  225. if s.firstInProcess {
  226. // append the process tags
  227. s.tags = append(s.tags, s.tracer.tags...)
  228. }
  229. filteredTags := make([]Tag, 0, len(s.tags))
  230. for _, tag := range s.tags {
  231. if handler, ok := specialTagHandlers[tag.key]; ok {
  232. handler(s, tag.value)
  233. } else {
  234. filteredTags = append(filteredTags, tag)
  235. }
  236. }
  237. s.tags = filteredTags
  238. }
  239. func setSpanKind(s *zipkinSpan, value interface{}) {
  240. if val, ok := value.(string); ok {
  241. s.spanKind = val
  242. return
  243. }
  244. if val, ok := value.(ext.SpanKindEnum); ok {
  245. s.spanKind = string(val)
  246. }
  247. }
  248. func setPeerIPv4(s *zipkinSpan, value interface{}) {
  249. if val, ok := value.(string); ok {
  250. if ip, err := utils.ParseIPToUint32(val); err == nil {
  251. s.peer.Ipv4 = int32(ip)
  252. return
  253. }
  254. }
  255. if val, ok := value.(uint32); ok {
  256. s.peer.Ipv4 = int32(val)
  257. return
  258. }
  259. if val, ok := value.(int32); ok {
  260. s.peer.Ipv4 = val
  261. }
  262. }
  263. func setPeerPort(s *zipkinSpan, value interface{}) {
  264. if val, ok := value.(string); ok {
  265. if port, err := utils.ParsePort(val); err == nil {
  266. s.peer.Port = int16(port)
  267. return
  268. }
  269. }
  270. if val, ok := value.(uint16); ok {
  271. s.peer.Port = int16(val)
  272. return
  273. }
  274. if val, ok := value.(int); ok {
  275. s.peer.Port = int16(val)
  276. }
  277. }
  278. func setPeerService(s *zipkinSpan, value interface{}) {
  279. if val, ok := value.(string); ok {
  280. s.peer.ServiceName = val
  281. }
  282. }
  283. func removeTag(s *zipkinSpan, value interface{}) {}
  284. func (s *zipkinSpan) peerDefined() bool {
  285. return s.peer.ServiceName != "" || s.peer.Ipv4 != 0 || s.peer.Port != 0
  286. }
  287. func (s *zipkinSpan) isRPC() bool {
  288. s.RLock()
  289. defer s.RUnlock()
  290. return s.spanKind == string(ext.SpanKindRPCClientEnum) || s.spanKind == string(ext.SpanKindRPCServerEnum)
  291. }
  292. func (s *zipkinSpan) isRPCClient() bool {
  293. s.RLock()
  294. defer s.RUnlock()
  295. return s.spanKind == string(ext.SpanKindRPCClientEnum)
  296. }