local.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a copy
  4. // of this software and associated documentation files (the "Software"), to deal
  5. // in the Software without restriction, including without limitation the rights
  6. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. // copies of the Software, and to permit persons to whom the Software is
  8. // furnished to do so, subject to the following conditions:
  9. //
  10. // The above copyright notice and this permission notice shall be included in
  11. // all copies or substantial portions of the Software.
  12. //
  13. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19. // THE SOFTWARE.
  20. package metrics
  21. import (
  22. "sort"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/codahale/hdrhistogram"
  27. )
  28. // This is intentionally very similar to github.com/codahale/metrics, the
  29. // main difference being that counters/gauges are scoped to the provider
  30. // rather than being global (to facilitate testing).
  31. // A LocalBackend is a metrics provider which aggregates data in-vm, and
  32. // allows exporting snapshots to shove the data into a remote collector
  33. type LocalBackend struct {
  34. cm sync.Mutex
  35. gm sync.Mutex
  36. tm sync.Mutex
  37. counters map[string]*int64
  38. gauges map[string]*int64
  39. timers map[string]*localBackendTimer
  40. stop chan struct{}
  41. wg sync.WaitGroup
  42. TagsSep string
  43. TagKVSep string
  44. }
  45. // NewLocalBackend returns a new LocalBackend. The collectionInterval is the histogram
  46. // time window for each timer.
  47. func NewLocalBackend(collectionInterval time.Duration) *LocalBackend {
  48. b := &LocalBackend{
  49. counters: make(map[string]*int64),
  50. gauges: make(map[string]*int64),
  51. timers: make(map[string]*localBackendTimer),
  52. stop: make(chan struct{}),
  53. TagsSep: "|",
  54. TagKVSep: "=",
  55. }
  56. if collectionInterval == 0 {
  57. // Use one histogram time window for all timers
  58. return b
  59. }
  60. b.wg.Add(1)
  61. go b.runLoop(collectionInterval)
  62. return b
  63. }
  64. // Clear discards accumulated stats
  65. func (b *LocalBackend) Clear() {
  66. b.cm.Lock()
  67. defer b.cm.Unlock()
  68. b.gm.Lock()
  69. defer b.gm.Unlock()
  70. b.tm.Lock()
  71. defer b.tm.Unlock()
  72. b.counters = make(map[string]*int64)
  73. b.gauges = make(map[string]*int64)
  74. b.timers = make(map[string]*localBackendTimer)
  75. }
  76. func (b *LocalBackend) runLoop(collectionInterval time.Duration) {
  77. defer b.wg.Done()
  78. ticker := time.NewTicker(collectionInterval)
  79. for {
  80. select {
  81. case <-ticker.C:
  82. b.tm.Lock()
  83. timers := make(map[string]*localBackendTimer, len(b.timers))
  84. for timerName, timer := range b.timers {
  85. timers[timerName] = timer
  86. }
  87. b.tm.Unlock()
  88. for _, t := range timers {
  89. t.Lock()
  90. t.hist.Rotate()
  91. t.Unlock()
  92. }
  93. case <-b.stop:
  94. ticker.Stop()
  95. return
  96. }
  97. }
  98. }
  99. // IncCounter increments a counter value
  100. func (b *LocalBackend) IncCounter(name string, tags map[string]string, delta int64) {
  101. name = GetKey(name, tags, b.TagsSep, b.TagKVSep)
  102. b.cm.Lock()
  103. defer b.cm.Unlock()
  104. counter := b.counters[name]
  105. if counter == nil {
  106. b.counters[name] = new(int64)
  107. *b.counters[name] = delta
  108. return
  109. }
  110. atomic.AddInt64(counter, delta)
  111. }
  112. // UpdateGauge updates the value of a gauge
  113. func (b *LocalBackend) UpdateGauge(name string, tags map[string]string, value int64) {
  114. name = GetKey(name, tags, b.TagsSep, b.TagKVSep)
  115. b.gm.Lock()
  116. defer b.gm.Unlock()
  117. gauge := b.gauges[name]
  118. if gauge == nil {
  119. b.gauges[name] = new(int64)
  120. *b.gauges[name] = value
  121. return
  122. }
  123. atomic.StoreInt64(gauge, value)
  124. }
  125. // RecordTimer records a timing duration
  126. func (b *LocalBackend) RecordTimer(name string, tags map[string]string, d time.Duration) {
  127. name = GetKey(name, tags, b.TagsSep, b.TagKVSep)
  128. timer := b.findOrCreateTimer(name)
  129. timer.Lock()
  130. timer.hist.Current.RecordValue(int64(d / time.Millisecond))
  131. timer.Unlock()
  132. }
  133. func (b *LocalBackend) findOrCreateTimer(name string) *localBackendTimer {
  134. b.tm.Lock()
  135. defer b.tm.Unlock()
  136. if t, ok := b.timers[name]; ok {
  137. return t
  138. }
  139. t := &localBackendTimer{
  140. hist: hdrhistogram.NewWindowed(5, 0, int64((5*time.Minute)/time.Millisecond), 1),
  141. }
  142. b.timers[name] = t
  143. return t
  144. }
  145. type localBackendTimer struct {
  146. sync.Mutex
  147. hist *hdrhistogram.WindowedHistogram
  148. }
  149. var (
  150. percentiles = map[string]float64{
  151. "P50": 50,
  152. "P75": 75,
  153. "P90": 90,
  154. "P95": 95,
  155. "P99": 99,
  156. "P999": 99.9,
  157. }
  158. )
  159. // Snapshot captures a snapshot of the current counter and gauge values
  160. func (b *LocalBackend) Snapshot() (counters, gauges map[string]int64) {
  161. b.cm.Lock()
  162. defer b.cm.Unlock()
  163. counters = make(map[string]int64, len(b.counters))
  164. for name, value := range b.counters {
  165. counters[name] = atomic.LoadInt64(value)
  166. }
  167. b.gm.Lock()
  168. defer b.gm.Unlock()
  169. gauges = make(map[string]int64, len(b.gauges))
  170. for name, value := range b.gauges {
  171. gauges[name] = atomic.LoadInt64(value)
  172. }
  173. b.tm.Lock()
  174. timers := make(map[string]*localBackendTimer)
  175. for timerName, timer := range b.timers {
  176. timers[timerName] = timer
  177. }
  178. b.tm.Unlock()
  179. for timerName, timer := range timers {
  180. timer.Lock()
  181. hist := timer.hist.Merge()
  182. timer.Unlock()
  183. for name, q := range percentiles {
  184. gauges[timerName+"."+name] = hist.ValueAtQuantile(q)
  185. }
  186. }
  187. return
  188. }
  189. // Stop cleanly closes the background goroutine spawned by NewLocalBackend.
  190. func (b *LocalBackend) Stop() {
  191. close(b.stop)
  192. b.wg.Wait()
  193. }
  194. // GetKey converts name+tags into a single string of the form
  195. // "name|tag1=value1|...|tagN=valueN", where tag names are
  196. // sorted alphabetically.
  197. func GetKey(name string, tags map[string]string, tagsSep string, tagKVSep string) string {
  198. keys := make([]string, 0, len(tags))
  199. for k := range tags {
  200. keys = append(keys, k)
  201. }
  202. sort.Strings(keys)
  203. key := name
  204. for _, k := range keys {
  205. key = key + tagsSep + k + tagKVSep + tags[k]
  206. }
  207. return key
  208. }
  209. type stats struct {
  210. name string
  211. tags map[string]string
  212. localBackend *LocalBackend
  213. }
  214. type localTimer struct {
  215. stats
  216. }
  217. func (l *localTimer) Record(d time.Duration) {
  218. l.localBackend.RecordTimer(l.name, l.tags, d)
  219. }
  220. type localCounter struct {
  221. stats
  222. }
  223. func (l *localCounter) Inc(delta int64) {
  224. l.localBackend.IncCounter(l.name, l.tags, delta)
  225. }
  226. type localGauge struct {
  227. stats
  228. }
  229. func (l *localGauge) Update(value int64) {
  230. l.localBackend.UpdateGauge(l.name, l.tags, value)
  231. }
  232. // LocalFactory stats factory that creates metrics that are stored locally
  233. type LocalFactory struct {
  234. *LocalBackend
  235. namespace string
  236. tags map[string]string
  237. }
  238. // NewLocalFactory returns a new LocalMetricsFactory
  239. func NewLocalFactory(collectionInterval time.Duration) *LocalFactory {
  240. return &LocalFactory{
  241. LocalBackend: NewLocalBackend(collectionInterval),
  242. }
  243. }
  244. // appendTags adds the tags to the namespace tags and returns a combined map.
  245. func (l *LocalFactory) appendTags(tags map[string]string) map[string]string {
  246. newTags := make(map[string]string)
  247. for k, v := range l.tags {
  248. newTags[k] = v
  249. }
  250. for k, v := range tags {
  251. newTags[k] = v
  252. }
  253. return newTags
  254. }
  255. func (l *LocalFactory) newNamespace(name string) string {
  256. if l.namespace == "" {
  257. return name
  258. }
  259. return l.namespace + "." + name
  260. }
  261. // Counter returns a local stats counter
  262. func (l *LocalFactory) Counter(name string, tags map[string]string) Counter {
  263. return &localCounter{
  264. stats{
  265. name: l.newNamespace(name),
  266. tags: l.appendTags(tags),
  267. localBackend: l.LocalBackend,
  268. },
  269. }
  270. }
  271. // Timer returns a local stats timer.
  272. func (l *LocalFactory) Timer(name string, tags map[string]string) Timer {
  273. return &localTimer{
  274. stats{
  275. name: l.newNamespace(name),
  276. tags: l.appendTags(tags),
  277. localBackend: l.LocalBackend,
  278. },
  279. }
  280. }
  281. // Gauge returns a local stats gauge.
  282. func (l *LocalFactory) Gauge(name string, tags map[string]string) Gauge {
  283. return &localGauge{
  284. stats{
  285. name: l.newNamespace(name),
  286. tags: l.appendTags(tags),
  287. localBackend: l.LocalBackend,
  288. },
  289. }
  290. }
  291. // Namespace returns a new namespace.
  292. func (l *LocalFactory) Namespace(name string, tags map[string]string) Factory {
  293. return &LocalFactory{
  294. namespace: l.newNamespace(name),
  295. tags: l.appendTags(tags),
  296. LocalBackend: l.LocalBackend,
  297. }
  298. }