transport_udp.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. // Copyright (c) 2016 Uber Technologies, Inc.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a copy
  4. // of this software and associated documentation files (the "Software"), to deal
  5. // in the Software without restriction, including without limitation the rights
  6. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. // copies of the Software, and to permit persons to whom the Software is
  8. // furnished to do so, subject to the following conditions:
  9. //
  10. // The above copyright notice and this permission notice shall be included in
  11. // all copies or substantial portions of the Software.
  12. //
  13. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19. // THE SOFTWARE.
  20. package jaeger
  21. import (
  22. "errors"
  23. "github.com/apache/thrift/lib/go/thrift"
  24. j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
  25. "github.com/uber/jaeger-client-go/utils"
  26. )
  27. // Empirically obtained constant for how many bytes in the message are used for envelope.
  28. // The total datagram size is:
  29. // sizeof(Span) * numSpans + processByteSize + emitBatchOverhead <= maxPacketSize
  30. // There is a unit test `TestEmitBatchOverhead` that validates this number.
  31. // Note that due to the use of Compact Thrift protocol, overhead grows with the number of spans
  32. // in the batch, because the length of the list is encoded as varint32, as well as SeqId.
  33. const emitBatchOverhead = 30
  34. const defaultUDPSpanServerHostPort = "localhost:6831"
  35. var errSpanTooLarge = errors.New("Span is too large")
  36. type udpSender struct {
  37. client *utils.AgentClientUDP
  38. maxPacketSize int // max size of datagram in bytes
  39. maxSpanBytes int // max number of bytes to record spans (excluding envelope) in the datagram
  40. byteBufferSize int // current number of span bytes accumulated in the buffer
  41. spanBuffer []*j.Span // spans buffered before a flush
  42. thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
  43. thriftProtocol thrift.TProtocol
  44. process *j.Process
  45. processByteSize int
  46. }
  47. // NewUDPTransport creates a reporter that submits spans to jaeger-agent
  48. func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
  49. if len(hostPort) == 0 {
  50. hostPort = defaultUDPSpanServerHostPort
  51. }
  52. if maxPacketSize == 0 {
  53. maxPacketSize = utils.UDPPacketMaxLength
  54. }
  55. protocolFactory := thrift.NewTCompactProtocolFactory()
  56. // Each span is first written to thriftBuffer to determine its size in bytes.
  57. thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize)
  58. thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)
  59. client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize)
  60. if err != nil {
  61. return nil, err
  62. }
  63. sender := &udpSender{
  64. client: client,
  65. maxSpanBytes: maxPacketSize - emitBatchOverhead,
  66. thriftBuffer: thriftBuffer,
  67. thriftProtocol: thriftProtocol}
  68. return sender, nil
  69. }
  70. func (s *udpSender) calcSizeOfSerializedThrift(thriftStruct thrift.TStruct) int {
  71. s.thriftBuffer.Reset()
  72. thriftStruct.Write(s.thriftProtocol)
  73. return s.thriftBuffer.Len()
  74. }
  75. func (s *udpSender) Append(span *Span) (int, error) {
  76. if s.process == nil {
  77. s.process = BuildJaegerProcessThrift(span)
  78. s.processByteSize = s.calcSizeOfSerializedThrift(s.process)
  79. s.byteBufferSize += s.processByteSize
  80. }
  81. jSpan := BuildJaegerThrift(span)
  82. spanSize := s.calcSizeOfSerializedThrift(jSpan)
  83. if spanSize > s.maxSpanBytes {
  84. return 1, errSpanTooLarge
  85. }
  86. s.byteBufferSize += spanSize
  87. if s.byteBufferSize <= s.maxSpanBytes {
  88. s.spanBuffer = append(s.spanBuffer, jSpan)
  89. if s.byteBufferSize < s.maxSpanBytes {
  90. return 0, nil
  91. }
  92. return s.Flush()
  93. }
  94. // the latest span did not fit in the buffer
  95. n, err := s.Flush()
  96. s.spanBuffer = append(s.spanBuffer, jSpan)
  97. s.byteBufferSize = spanSize + s.processByteSize
  98. return n, err
  99. }
  100. func (s *udpSender) Flush() (int, error) {
  101. n := len(s.spanBuffer)
  102. if n == 0 {
  103. return 0, nil
  104. }
  105. err := s.client.EmitBatch(&j.Batch{Process: s.process, Spans: s.spanBuffer})
  106. s.resetBuffers()
  107. return n, err
  108. }
  109. func (s *udpSender) Close() error {
  110. return s.client.Close()
  111. }
  112. func (s *udpSender) resetBuffers() {
  113. for i := range s.spanBuffer {
  114. s.spanBuffer[i] = nil
  115. }
  116. s.spanBuffer = s.spanBuffer[:0]
  117. s.byteBufferSize = s.processByteSize
  118. }