graphite.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. // Copyright 2016 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. // Package graphite provides a bridge to push Prometheus metrics to a Graphite
  14. // server.
  15. package graphitebridge
  16. import (
  17. "bufio"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net"
  22. "sort"
  23. "strings"
  24. "time"
  25. "github.com/prometheus/common/expfmt"
  26. "github.com/prometheus/common/model"
  27. "golang.org/x/net/context"
  28. dto "github.com/prometheus/client_model/go"
  29. "github.com/prometheus/client_golang/prometheus"
  30. )
  31. const (
  32. defaultInterval = 15 * time.Second
  33. millisecondsPerSecond = 1000
  34. )
  35. // HandlerErrorHandling defines how a Handler serving metrics will handle
  36. // errors.
  37. type HandlerErrorHandling int
  38. // These constants cause handlers serving metrics to behave as described if
  39. // errors are encountered.
  40. const (
  41. // Ignore errors and try to push as many metrics to Graphite as possible.
  42. ContinueOnError HandlerErrorHandling = iota
  43. // Abort the push to Graphite upon the first error encountered.
  44. AbortOnError
  45. )
  46. var metricCategoryPrefix []string = []string{"proxy_", "api_", "page_", "alerting_", "aws_", "db_", "stat_", "go_", "process_"}
  47. var trimMetricPrefix []string = []string{"grafana_"}
  48. // Config defines the Graphite bridge config.
  49. type Config struct {
  50. // The url to push data to. Required.
  51. URL string
  52. // The prefix for the pushed Graphite metrics. Defaults to empty string.
  53. Prefix string
  54. // The interval to use for pushing data to Graphite. Defaults to 15 seconds.
  55. Interval time.Duration
  56. // The timeout for pushing metrics to Graphite. Defaults to 15 seconds.
  57. Timeout time.Duration
  58. // The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer.
  59. Gatherer prometheus.Gatherer
  60. // The logger that messages are written to. Defaults to no logging.
  61. Logger Logger
  62. // ErrorHandling defines how errors are handled. Note that errors are
  63. // logged regardless of the configured ErrorHandling provided Logger
  64. // is not nil.
  65. ErrorHandling HandlerErrorHandling
  66. // Graphite does not support ever increasing counter the same way
  67. // prometheus does. Rollups and ingestion might cannot handle ever
  68. // increasing counters. This option allows enabled the caller to
  69. // calculate the delta by saving the last sent counter in memory
  70. // and subtraction it from the collected value before sending.
  71. CountersAsDelta bool
  72. }
  73. // Bridge pushes metrics to the configured Graphite server.
  74. type Bridge struct {
  75. url string
  76. prefix string
  77. countersAsDetlas bool
  78. interval time.Duration
  79. timeout time.Duration
  80. errorHandling HandlerErrorHandling
  81. logger Logger
  82. g prometheus.Gatherer
  83. lastValue map[model.Fingerprint]float64
  84. }
  85. // Logger is the minimal interface Bridge needs for logging. Note that
  86. // log.Logger from the standard library implements this interface, and it is
  87. // easy to implement by custom loggers, if they don't do so already anyway.
  88. type Logger interface {
  89. Println(v ...interface{})
  90. }
  91. // NewBridge returns a pointer to a new Bridge struct.
  92. func NewBridge(c *Config) (*Bridge, error) {
  93. b := &Bridge{}
  94. if c.URL == "" {
  95. return nil, errors.New("missing URL")
  96. }
  97. b.url = c.URL
  98. if c.Gatherer == nil {
  99. b.g = prometheus.DefaultGatherer
  100. } else {
  101. b.g = c.Gatherer
  102. }
  103. if c.Logger != nil {
  104. b.logger = c.Logger
  105. }
  106. if c.Prefix != "" {
  107. b.prefix = c.Prefix
  108. }
  109. var z time.Duration
  110. if c.Interval == z {
  111. b.interval = defaultInterval
  112. } else {
  113. b.interval = c.Interval
  114. }
  115. if c.Timeout == z {
  116. b.timeout = defaultInterval
  117. } else {
  118. b.timeout = c.Timeout
  119. }
  120. b.errorHandling = c.ErrorHandling
  121. b.lastValue = map[model.Fingerprint]float64{}
  122. b.countersAsDetlas = c.CountersAsDelta
  123. return b, nil
  124. }
  125. // Run starts the event loop that pushes Prometheus metrics to Graphite at the
  126. // configured interval.
  127. func (b *Bridge) Run(ctx context.Context) {
  128. ticker := time.NewTicker(b.interval)
  129. defer ticker.Stop()
  130. for {
  131. select {
  132. case <-ticker.C:
  133. if err := b.Push(); err != nil && b.logger != nil {
  134. b.logger.Println("error pushing to Graphite:", err)
  135. }
  136. case <-ctx.Done():
  137. return
  138. }
  139. }
  140. }
  141. // Push pushes Prometheus metrics to the configured Graphite server.
  142. func (b *Bridge) Push() error {
  143. mfs, err := b.g.Gather()
  144. if err != nil || len(mfs) == 0 {
  145. switch b.errorHandling {
  146. case AbortOnError:
  147. return err
  148. case ContinueOnError:
  149. if b.logger != nil {
  150. b.logger.Println("continue on error:", err)
  151. }
  152. default:
  153. panic("unrecognized error handling value")
  154. }
  155. }
  156. conn, err := net.DialTimeout("tcp", b.url, b.timeout)
  157. if err != nil {
  158. return err
  159. }
  160. defer conn.Close()
  161. return b.writeMetrics(conn, mfs, b.prefix, model.Now())
  162. }
  163. func (b *Bridge) writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model.Time) error {
  164. for _, mf := range mfs {
  165. vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{
  166. Timestamp: now,
  167. }, mf)
  168. if err != nil {
  169. return err
  170. }
  171. buf := bufio.NewWriter(w)
  172. for _, s := range vec {
  173. if err := writePrefix(buf, prefix); err != nil {
  174. return err
  175. }
  176. if err := writeMetric(buf, s.Metric, mf); err != nil {
  177. return err
  178. }
  179. value := b.replaceCounterWithDelta(mf, s.Metric, s.Value)
  180. if _, err := fmt.Fprintf(buf, " %g %d\n", value, int64(s.Timestamp)/millisecondsPerSecond); err != nil {
  181. return err
  182. }
  183. if err := buf.Flush(); err != nil {
  184. return err
  185. }
  186. }
  187. }
  188. return nil
  189. }
  190. func writeMetric(buf *bufio.Writer, m model.Metric, mf *dto.MetricFamily) error {
  191. metricName, hasName := m[model.MetricNameLabel]
  192. numLabels := len(m) - 1
  193. if !hasName {
  194. numLabels = len(m)
  195. }
  196. for _, v := range metricCategoryPrefix {
  197. if strings.HasPrefix(string(metricName), v) {
  198. group := strings.Replace(v, "_", " ", 1)
  199. metricName = model.LabelValue(strings.Replace(string(metricName), v, group, 1))
  200. }
  201. }
  202. for _, v := range trimMetricPrefix {
  203. if strings.HasPrefix(string(metricName), v) {
  204. metricName = model.LabelValue(strings.Replace(string(metricName), v, "", 1))
  205. }
  206. }
  207. labelStrings := make([]string, 0, numLabels)
  208. for label, value := range m {
  209. if label != model.MetricNameLabel {
  210. labelStrings = append(labelStrings, fmt.Sprintf("%s %s", string(label), string(value)))
  211. }
  212. }
  213. var err error
  214. switch numLabels {
  215. case 0:
  216. if hasName {
  217. if err := writeSanitized(buf, string(metricName)); err != nil {
  218. return err
  219. }
  220. }
  221. default:
  222. sort.Strings(labelStrings)
  223. if err = writeSanitized(buf, string(metricName)); err != nil {
  224. return err
  225. }
  226. for _, s := range labelStrings {
  227. if err = buf.WriteByte('.'); err != nil {
  228. return err
  229. }
  230. if err = writeSanitized(buf, s); err != nil {
  231. return err
  232. }
  233. }
  234. }
  235. if err = addExtentionConventionForRollups(buf, mf, m); err != nil {
  236. return err
  237. }
  238. return nil
  239. }
  240. func addExtentionConventionForRollups(buf *bufio.Writer, mf *dto.MetricFamily, m model.Metric) error {
  241. // Adding `.count` `.sum` suffix makes it possible to configure
  242. // different rollup strategies based on metric type
  243. mfType := mf.GetType()
  244. var err error
  245. if mfType == dto.MetricType_COUNTER {
  246. if _, err = fmt.Fprint(buf, ".count"); err != nil {
  247. return err
  248. }
  249. }
  250. if mfType == dto.MetricType_SUMMARY || mfType == dto.MetricType_HISTOGRAM {
  251. if strings.HasSuffix(string(m[model.MetricNameLabel]), "_count") {
  252. if _, err = fmt.Fprint(buf, ".count"); err != nil {
  253. return err
  254. }
  255. }
  256. }
  257. if mfType == dto.MetricType_HISTOGRAM {
  258. if strings.HasSuffix(string(m[model.MetricNameLabel]), "_sum") {
  259. if _, err = fmt.Fprint(buf, ".sum"); err != nil {
  260. return err
  261. }
  262. }
  263. }
  264. return nil
  265. }
  266. func writePrefix(buf *bufio.Writer, s string) error {
  267. for _, c := range s {
  268. if _, err := buf.WriteRune(replaceInvalid(c)); err != nil {
  269. return err
  270. }
  271. }
  272. return nil
  273. }
  274. func writeSanitized(buf *bufio.Writer, s string) error {
  275. prevUnderscore := false
  276. for _, c := range s {
  277. c = replaceInvalidRune(c)
  278. if c == '_' {
  279. if prevUnderscore {
  280. continue
  281. }
  282. prevUnderscore = true
  283. } else {
  284. prevUnderscore = false
  285. }
  286. if _, err := buf.WriteRune(c); err != nil {
  287. return err
  288. }
  289. }
  290. return nil
  291. }
  292. func replaceInvalid(c rune) rune {
  293. if c == ' ' || c == '.' {
  294. return '.'
  295. }
  296. return replaceInvalidRune(c)
  297. }
  298. func replaceInvalidRune(c rune) rune {
  299. if c == ' ' {
  300. return '.'
  301. }
  302. if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_' || c == ':' || (c >= '0' && c <= '9')) {
  303. return '_'
  304. }
  305. return c
  306. }
  307. func (b *Bridge) replaceCounterWithDelta(mf *dto.MetricFamily, metric model.Metric, value model.SampleValue) float64 {
  308. if !b.countersAsDetlas {
  309. return float64(value)
  310. }
  311. mfType := mf.GetType()
  312. if mfType == dto.MetricType_COUNTER {
  313. return b.returnDelta(metric, value)
  314. }
  315. if mfType == dto.MetricType_SUMMARY {
  316. if strings.HasSuffix(string(metric[model.MetricNameLabel]), "_count") {
  317. return b.returnDelta(metric, value)
  318. }
  319. }
  320. return float64(value)
  321. }
  322. func (b *Bridge) returnDelta(metric model.Metric, value model.SampleValue) float64 {
  323. key := metric.Fingerprint()
  324. _, exists := b.lastValue[key]
  325. if !exists {
  326. b.lastValue[key] = 0
  327. }
  328. delta := float64(value) - b.lastValue[key]
  329. b.lastValue[key] = float64(value)
  330. return delta
  331. }