graphite.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. // Copyright 2016 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. // Package graphite provides a bridge to push Prometheus metrics to a Graphite
  14. // server.
  15. package graphitebridge
  16. import (
  17. "bufio"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "math"
  22. "net"
  23. "sort"
  24. "strings"
  25. "time"
  26. "github.com/prometheus/common/expfmt"
  27. "github.com/prometheus/common/model"
  28. "golang.org/x/net/context"
  29. dto "github.com/prometheus/client_model/go"
  30. "github.com/prometheus/client_golang/prometheus"
  31. )
  32. const (
  33. defaultInterval = 15 * time.Second
  34. millisecondsPerSecond = 1000
  35. )
  36. // HandlerErrorHandling defines how a Handler serving metrics will handle
  37. // errors.
  38. type HandlerErrorHandling int
  39. // These constants cause handlers serving metrics to behave as described if
  40. // errors are encountered.
  41. const (
  42. // Ignore errors and try to push as many metrics to Graphite as possible.
  43. ContinueOnError HandlerErrorHandling = iota
  44. // Abort the push to Graphite upon the first error encountered.
  45. AbortOnError
  46. )
  47. var metricCategoryPrefix []string = []string{"proxy_", "api_", "page_", "alerting_", "aws_", "db_", "stat_", "go_", "process_"}
  48. var trimMetricPrefix []string = []string{"grafana_"}
  49. // Config defines the Graphite bridge config.
  50. type Config struct {
  51. // The url to push data to. Required.
  52. URL string
  53. // The prefix for the pushed Graphite metrics. Defaults to empty string.
  54. Prefix string
  55. // The interval to use for pushing data to Graphite. Defaults to 15 seconds.
  56. Interval time.Duration
  57. // The timeout for pushing metrics to Graphite. Defaults to 15 seconds.
  58. Timeout time.Duration
  59. // The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer.
  60. Gatherer prometheus.Gatherer
  61. // The logger that messages are written to. Defaults to no logging.
  62. Logger Logger
  63. // ErrorHandling defines how errors are handled. Note that errors are
  64. // logged regardless of the configured ErrorHandling provided Logger
  65. // is not nil.
  66. ErrorHandling HandlerErrorHandling
  67. // Graphite does not support ever increasing counter the same way
  68. // prometheus does. Rollups and ingestion might cannot handle ever
  69. // increasing counters. This option allows enabled the caller to
  70. // calculate the delta by saving the last sent counter in memory
  71. // and subtraction it from the collected value before sending.
  72. CountersAsDelta bool
  73. }
  74. // Bridge pushes metrics to the configured Graphite server.
  75. type Bridge struct {
  76. url string
  77. prefix string
  78. countersAsDetlas bool
  79. interval time.Duration
  80. timeout time.Duration
  81. errorHandling HandlerErrorHandling
  82. logger Logger
  83. g prometheus.Gatherer
  84. lastValue map[model.Fingerprint]float64
  85. }
  86. // Logger is the minimal interface Bridge needs for logging. Note that
  87. // log.Logger from the standard library implements this interface, and it is
  88. // easy to implement by custom loggers, if they don't do so already anyway.
  89. type Logger interface {
  90. Println(v ...interface{})
  91. }
  92. // NewBridge returns a pointer to a new Bridge struct.
  93. func NewBridge(c *Config) (*Bridge, error) {
  94. b := &Bridge{}
  95. if c.URL == "" {
  96. return nil, errors.New("missing URL")
  97. }
  98. b.url = c.URL
  99. if c.Gatherer == nil {
  100. b.g = prometheus.DefaultGatherer
  101. } else {
  102. b.g = c.Gatherer
  103. }
  104. if c.Logger != nil {
  105. b.logger = c.Logger
  106. }
  107. if c.Prefix != "" {
  108. b.prefix = c.Prefix
  109. }
  110. var z time.Duration
  111. if c.Interval == z {
  112. b.interval = defaultInterval
  113. } else {
  114. b.interval = c.Interval
  115. }
  116. if c.Timeout == z {
  117. b.timeout = defaultInterval
  118. } else {
  119. b.timeout = c.Timeout
  120. }
  121. b.errorHandling = c.ErrorHandling
  122. b.lastValue = map[model.Fingerprint]float64{}
  123. b.countersAsDetlas = c.CountersAsDelta
  124. return b, nil
  125. }
  126. // Run starts the event loop that pushes Prometheus metrics to Graphite at the
  127. // configured interval.
  128. func (b *Bridge) Run(ctx context.Context) {
  129. ticker := time.NewTicker(b.interval)
  130. defer ticker.Stop()
  131. for {
  132. select {
  133. case <-ticker.C:
  134. if err := b.Push(); err != nil && b.logger != nil {
  135. b.logger.Println("error pushing to Graphite:", err)
  136. }
  137. case <-ctx.Done():
  138. return
  139. }
  140. }
  141. }
  142. // Push pushes Prometheus metrics to the configured Graphite server.
  143. func (b *Bridge) Push() error {
  144. mfs, err := b.g.Gather()
  145. if err != nil || len(mfs) == 0 {
  146. switch b.errorHandling {
  147. case AbortOnError:
  148. return err
  149. case ContinueOnError:
  150. if b.logger != nil {
  151. b.logger.Println("continue on error:", err)
  152. }
  153. default:
  154. panic("unrecognized error handling value")
  155. }
  156. }
  157. conn, err := net.DialTimeout("tcp", b.url, b.timeout)
  158. if err != nil {
  159. return err
  160. }
  161. defer conn.Close()
  162. return b.writeMetrics(conn, mfs, b.prefix, model.Now())
  163. }
  164. func (b *Bridge) writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model.Time) error {
  165. for _, mf := range mfs {
  166. vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{
  167. Timestamp: now,
  168. }, mf)
  169. if err != nil {
  170. return err
  171. }
  172. buf := bufio.NewWriter(w)
  173. for _, s := range vec {
  174. if math.IsNaN(float64(s.Value)) {
  175. continue
  176. }
  177. if err := writePrefix(buf, prefix); err != nil {
  178. return err
  179. }
  180. if err := writeMetric(buf, s.Metric, mf); err != nil {
  181. return err
  182. }
  183. value := b.replaceCounterWithDelta(mf, s.Metric, s.Value)
  184. if _, err := fmt.Fprintf(buf, " %g %d\n", value, int64(s.Timestamp)/millisecondsPerSecond); err != nil {
  185. return err
  186. }
  187. if err := buf.Flush(); err != nil {
  188. return err
  189. }
  190. }
  191. }
  192. return nil
  193. }
  194. func writeMetric(buf *bufio.Writer, m model.Metric, mf *dto.MetricFamily) error {
  195. metricName, hasName := m[model.MetricNameLabel]
  196. numLabels := len(m) - 1
  197. if !hasName {
  198. numLabels = len(m)
  199. }
  200. for _, v := range metricCategoryPrefix {
  201. if strings.HasPrefix(string(metricName), v) {
  202. group := strings.Replace(v, "_", " ", 1)
  203. metricName = model.LabelValue(strings.Replace(string(metricName), v, group, 1))
  204. }
  205. }
  206. for _, v := range trimMetricPrefix {
  207. if strings.HasPrefix(string(metricName), v) {
  208. metricName = model.LabelValue(strings.Replace(string(metricName), v, "", 1))
  209. }
  210. }
  211. labelStrings := make([]string, 0, numLabels)
  212. for label, value := range m {
  213. if label != model.MetricNameLabel {
  214. labelStrings = append(labelStrings, fmt.Sprintf("%s %s", string(label), string(value)))
  215. }
  216. }
  217. var err error
  218. switch numLabels {
  219. case 0:
  220. if hasName {
  221. if err := writeSanitized(buf, string(metricName)); err != nil {
  222. return err
  223. }
  224. }
  225. default:
  226. sort.Strings(labelStrings)
  227. if err = writeSanitized(buf, string(metricName)); err != nil {
  228. return err
  229. }
  230. for _, s := range labelStrings {
  231. if err = buf.WriteByte('.'); err != nil {
  232. return err
  233. }
  234. if err = writeSanitized(buf, s); err != nil {
  235. return err
  236. }
  237. }
  238. }
  239. if err = addExtentionConventionForRollups(buf, mf, m); err != nil {
  240. return err
  241. }
  242. return nil
  243. }
  244. func addExtentionConventionForRollups(buf *bufio.Writer, mf *dto.MetricFamily, m model.Metric) error {
  245. // Adding `.count` `.sum` suffix makes it possible to configure
  246. // different rollup strategies based on metric type
  247. mfType := mf.GetType()
  248. var err error
  249. if mfType == dto.MetricType_COUNTER {
  250. if _, err = fmt.Fprint(buf, ".count"); err != nil {
  251. return err
  252. }
  253. }
  254. if mfType == dto.MetricType_SUMMARY || mfType == dto.MetricType_HISTOGRAM {
  255. if strings.HasSuffix(string(m[model.MetricNameLabel]), "_count") {
  256. if _, err = fmt.Fprint(buf, ".count"); err != nil {
  257. return err
  258. }
  259. }
  260. }
  261. if mfType == dto.MetricType_HISTOGRAM {
  262. if strings.HasSuffix(string(m[model.MetricNameLabel]), "_sum") {
  263. if _, err = fmt.Fprint(buf, ".sum"); err != nil {
  264. return err
  265. }
  266. }
  267. }
  268. return nil
  269. }
  270. func writePrefix(buf *bufio.Writer, s string) error {
  271. for _, c := range s {
  272. if _, err := buf.WriteRune(replaceInvalid(c)); err != nil {
  273. return err
  274. }
  275. }
  276. return nil
  277. }
  278. func writeSanitized(buf *bufio.Writer, s string) error {
  279. prevUnderscore := false
  280. for _, c := range s {
  281. c = replaceInvalidRune(c)
  282. if c == '_' {
  283. if prevUnderscore {
  284. continue
  285. }
  286. prevUnderscore = true
  287. } else {
  288. prevUnderscore = false
  289. }
  290. if _, err := buf.WriteRune(c); err != nil {
  291. return err
  292. }
  293. }
  294. return nil
  295. }
  296. func replaceInvalid(c rune) rune {
  297. if c == ' ' || c == '.' {
  298. return '.'
  299. }
  300. return replaceInvalidRune(c)
  301. }
  302. func replaceInvalidRune(c rune) rune {
  303. if c == ' ' {
  304. return '.'
  305. }
  306. if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '-' || c == '_' || c == ':' || (c >= '0' && c <= '9')) {
  307. return '_'
  308. }
  309. return c
  310. }
  311. func (b *Bridge) replaceCounterWithDelta(mf *dto.MetricFamily, metric model.Metric, value model.SampleValue) float64 {
  312. if !b.countersAsDetlas {
  313. return float64(value)
  314. }
  315. mfType := mf.GetType()
  316. if mfType == dto.MetricType_COUNTER {
  317. return b.returnDelta(metric, value)
  318. }
  319. if mfType == dto.MetricType_SUMMARY {
  320. if strings.HasSuffix(string(metric[model.MetricNameLabel]), "_count") {
  321. return b.returnDelta(metric, value)
  322. }
  323. }
  324. return float64(value)
  325. }
  326. func (b *Bridge) returnDelta(metric model.Metric, value model.SampleValue) float64 {
  327. key := metric.Fingerprint()
  328. _, exists := b.lastValue[key]
  329. if !exists {
  330. b.lastValue[key] = 0
  331. }
  332. delta := float64(value) - b.lastValue[key]
  333. b.lastValue[key] = float64(value)
  334. return delta
  335. }