restriction_manager.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a copy
  4. // of this software and associated documentation files (the "Software"), to deal
  5. // in the Software without restriction, including without limitation the rights
  6. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. // copies of the Software, and to permit persons to whom the Software is
  8. // furnished to do so, subject to the following conditions:
  9. //
  10. // The above copyright notice and this permission notice shall be included in
  11. // all copies or substantial portions of the Software.
  12. //
  13. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19. // THE SOFTWARE.
  20. package remote
  21. import (
  22. "fmt"
  23. "net/url"
  24. "sync"
  25. "time"
  26. "github.com/uber/jaeger-client-go/internal/baggage"
  27. thrift "github.com/uber/jaeger-client-go/thrift-gen/baggage"
  28. "github.com/uber/jaeger-client-go/utils"
  29. )
  30. type httpBaggageRestrictionManagerProxy struct {
  31. url string
  32. }
  33. func newHTTPBaggageRestrictionManagerProxy(hostPort, serviceName string) *httpBaggageRestrictionManagerProxy {
  34. v := url.Values{}
  35. v.Set("service", serviceName)
  36. return &httpBaggageRestrictionManagerProxy{
  37. url: fmt.Sprintf("http://%s/baggageRestrictions?%s", hostPort, v.Encode()),
  38. }
  39. }
  40. func (s *httpBaggageRestrictionManagerProxy) GetBaggageRestrictions(serviceName string) ([]*thrift.BaggageRestriction, error) {
  41. var out []*thrift.BaggageRestriction
  42. if err := utils.GetJSON(s.url, &out); err != nil {
  43. return nil, err
  44. }
  45. return out, nil
  46. }
  47. // RestrictionManager manages baggage restrictions by retrieving baggage restrictions from agent
  48. type RestrictionManager struct {
  49. options
  50. mux sync.RWMutex
  51. serviceName string
  52. restrictions map[string]*baggage.Restriction
  53. thriftProxy thrift.BaggageRestrictionManager
  54. pollStopped sync.WaitGroup
  55. stopPoll chan struct{}
  56. invalidRestriction *baggage.Restriction
  57. validRestriction *baggage.Restriction
  58. // Determines if the manager has successfully retrieved baggage restrictions from agent
  59. initialized bool
  60. }
  61. // NewRestrictionManager returns a BaggageRestrictionManager that polls the agent for the latest
  62. // baggage restrictions.
  63. func NewRestrictionManager(serviceName string, options ...Option) *RestrictionManager {
  64. // TODO there is a developing use case where a single tracer can generate traces on behalf of many services.
  65. // restrictionsMap will need to exist per service
  66. opts := applyOptions(options...)
  67. m := &RestrictionManager{
  68. serviceName: serviceName,
  69. options: opts,
  70. restrictions: make(map[string]*baggage.Restriction),
  71. thriftProxy: newHTTPBaggageRestrictionManagerProxy(opts.hostPort, serviceName),
  72. stopPoll: make(chan struct{}),
  73. invalidRestriction: baggage.NewRestriction(false, 0),
  74. validRestriction: baggage.NewRestriction(true, defaultMaxValueLength),
  75. }
  76. m.pollStopped.Add(1)
  77. go m.pollManager()
  78. return m
  79. }
  80. // isReady returns true if the manager has retrieved baggage restrictions from the remote source.
  81. func (m *RestrictionManager) isReady() bool {
  82. m.mux.RLock()
  83. defer m.mux.RUnlock()
  84. return m.initialized
  85. }
  86. // GetRestriction implements RestrictionManager#GetRestriction.
  87. func (m *RestrictionManager) GetRestriction(key string) *baggage.Restriction {
  88. m.mux.RLock()
  89. defer m.mux.RUnlock()
  90. if !m.initialized {
  91. if m.denyBaggageOnInitializationFailure {
  92. return m.invalidRestriction
  93. }
  94. return m.validRestriction
  95. }
  96. if restriction, ok := m.restrictions[key]; ok {
  97. return restriction
  98. }
  99. return m.invalidRestriction
  100. }
  101. // Close stops remote polling and closes the RemoteRestrictionManager.
  102. func (m *RestrictionManager) Close() error {
  103. close(m.stopPoll)
  104. m.pollStopped.Wait()
  105. return nil
  106. }
  107. func (m *RestrictionManager) pollManager() {
  108. defer m.pollStopped.Done()
  109. // attempt to initialize baggage restrictions
  110. if err := m.updateRestrictions(); err != nil {
  111. m.logger.Error(fmt.Sprintf("Failed to initialize baggage restrictions: %s", err.Error()))
  112. }
  113. ticker := time.NewTicker(m.refreshInterval)
  114. defer ticker.Stop()
  115. for {
  116. select {
  117. case <-ticker.C:
  118. if err := m.updateRestrictions(); err != nil {
  119. m.logger.Error(fmt.Sprintf("Failed to update baggage restrictions: %s", err.Error()))
  120. }
  121. case <-m.stopPoll:
  122. return
  123. }
  124. }
  125. }
  126. func (m *RestrictionManager) updateRestrictions() error {
  127. restrictions, err := m.thriftProxy.GetBaggageRestrictions(m.serviceName)
  128. if err != nil {
  129. m.metrics.BaggageRestrictionsUpdateFailure.Inc(1)
  130. return err
  131. }
  132. newRestrictions := m.parseRestrictions(restrictions)
  133. m.metrics.BaggageRestrictionsUpdateSuccess.Inc(1)
  134. m.mux.Lock()
  135. defer m.mux.Unlock()
  136. m.initialized = true
  137. m.restrictions = newRestrictions
  138. return nil
  139. }
  140. func (m *RestrictionManager) parseRestrictions(restrictions []*thrift.BaggageRestriction) map[string]*baggage.Restriction {
  141. setters := make(map[string]*baggage.Restriction, len(restrictions))
  142. for _, restriction := range restrictions {
  143. setters[restriction.BaggageKey] = baggage.NewRestriction(true, int(restriction.MaxValueLength))
  144. }
  145. return setters
  146. }