graphite.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. // Copyright 2016 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. // Package graphite provides a bridge to push Prometheus metrics to a Graphite
  14. // server.
  15. package graphitebridge
  16. import (
  17. "bufio"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net"
  22. "sort"
  23. "strings"
  24. "time"
  25. "github.com/prometheus/common/expfmt"
  26. "github.com/prometheus/common/model"
  27. "golang.org/x/net/context"
  28. dto "github.com/prometheus/client_model/go"
  29. "github.com/prometheus/client_golang/prometheus"
  30. )
  31. const (
  32. defaultInterval = 15 * time.Second
  33. millisecondsPerSecond = 1000
  34. )
  35. // HandlerErrorHandling defines how a Handler serving metrics will handle
  36. // errors.
  37. type HandlerErrorHandling int
  38. // These constants cause handlers serving metrics to behave as described if
  39. // errors are encountered.
  40. const (
  41. // Ignore errors and try to push as many metrics to Graphite as possible.
  42. ContinueOnError HandlerErrorHandling = iota
  43. // Abort the push to Graphite upon the first error encountered.
  44. AbortOnError
  45. )
  46. var metricCategoryPrefix []string = []string{"proxy_", "api_", "page_", "alerting_", "aws_", "db_", "stat_", "go_", "process_"}
  47. // Config defines the Graphite bridge config.
  48. type Config struct {
  49. // The url to push data to. Required.
  50. URL string
  51. // The prefix for the pushed Graphite metrics. Defaults to empty string.
  52. Prefix string
  53. // The interval to use for pushing data to Graphite. Defaults to 15 seconds.
  54. Interval time.Duration
  55. // The timeout for pushing metrics to Graphite. Defaults to 15 seconds.
  56. Timeout time.Duration
  57. // The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer.
  58. Gatherer prometheus.Gatherer
  59. // The logger that messages are written to. Defaults to no logging.
  60. Logger Logger
  61. // ErrorHandling defines how errors are handled. Note that errors are
  62. // logged regardless of the configured ErrorHandling provided Logger
  63. // is not nil.
  64. ErrorHandling HandlerErrorHandling
  65. // Graphite does not support ever increasing counter the same way
  66. // prometheus does. Rollups and ingestion might cannot handle ever
  67. // increasing counters. This option allows enabled the caller to
  68. // calculate the delta by saving the last sent counter in memory
  69. // and subtraction it from the collected value before sending.
  70. CountersAsDelta bool
  71. }
  72. // Bridge pushes metrics to the configured Graphite server.
  73. type Bridge struct {
  74. url string
  75. prefix string
  76. countersAsDetlas bool
  77. interval time.Duration
  78. timeout time.Duration
  79. errorHandling HandlerErrorHandling
  80. logger Logger
  81. g prometheus.Gatherer
  82. lastValue map[model.Fingerprint]float64
  83. }
  84. // Logger is the minimal interface Bridge needs for logging. Note that
  85. // log.Logger from the standard library implements this interface, and it is
  86. // easy to implement by custom loggers, if they don't do so already anyway.
  87. type Logger interface {
  88. Println(v ...interface{})
  89. }
  90. // NewBridge returns a pointer to a new Bridge struct.
  91. func NewBridge(c *Config) (*Bridge, error) {
  92. b := &Bridge{}
  93. if c.URL == "" {
  94. return nil, errors.New("missing URL")
  95. }
  96. b.url = c.URL
  97. if c.Gatherer == nil {
  98. b.g = prometheus.DefaultGatherer
  99. } else {
  100. b.g = c.Gatherer
  101. }
  102. if c.Logger != nil {
  103. b.logger = c.Logger
  104. }
  105. if c.Prefix != "" {
  106. b.prefix = c.Prefix
  107. }
  108. var z time.Duration
  109. if c.Interval == z {
  110. b.interval = defaultInterval
  111. } else {
  112. b.interval = c.Interval
  113. }
  114. if c.Timeout == z {
  115. b.timeout = defaultInterval
  116. } else {
  117. b.timeout = c.Timeout
  118. }
  119. b.errorHandling = c.ErrorHandling
  120. b.lastValue = map[model.Fingerprint]float64{}
  121. b.countersAsDetlas = c.CountersAsDelta
  122. return b, nil
  123. }
  124. // Run starts the event loop that pushes Prometheus metrics to Graphite at the
  125. // configured interval.
  126. func (b *Bridge) Run(ctx context.Context) {
  127. ticker := time.NewTicker(b.interval)
  128. defer ticker.Stop()
  129. for {
  130. select {
  131. case <-ticker.C:
  132. if err := b.Push(); err != nil && b.logger != nil {
  133. b.logger.Println("error pushing to Graphite:", err)
  134. }
  135. case <-ctx.Done():
  136. return
  137. }
  138. }
  139. }
  140. // Push pushes Prometheus metrics to the configured Graphite server.
  141. func (b *Bridge) Push() error {
  142. mfs, err := b.g.Gather()
  143. if err != nil || len(mfs) == 0 {
  144. switch b.errorHandling {
  145. case AbortOnError:
  146. return err
  147. case ContinueOnError:
  148. if b.logger != nil {
  149. b.logger.Println("continue on error:", err)
  150. }
  151. default:
  152. panic("unrecognized error handling value")
  153. }
  154. }
  155. conn, err := net.DialTimeout("tcp", b.url, b.timeout)
  156. if err != nil {
  157. return err
  158. }
  159. defer conn.Close()
  160. return b.writeMetrics(conn, mfs, b.prefix, model.Now())
  161. }
  162. func (b *Bridge) writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model.Time) error {
  163. for _, mf := range mfs {
  164. vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{
  165. Timestamp: now,
  166. }, mf)
  167. if err != nil {
  168. return err
  169. }
  170. buf := bufio.NewWriter(w)
  171. for _, s := range vec {
  172. if err := writePrefix(buf, prefix); err != nil {
  173. return err
  174. }
  175. if err := writeMetric(buf, s.Metric, mf); err != nil {
  176. return err
  177. }
  178. value := b.replaceCounterWithDelta(mf, s.Metric, s.Value)
  179. if _, err := fmt.Fprintf(buf, " %g %d\n", value, int64(s.Timestamp)/millisecondsPerSecond); err != nil {
  180. return err
  181. }
  182. if err := buf.Flush(); err != nil {
  183. return err
  184. }
  185. }
  186. }
  187. return nil
  188. }
  189. func writeMetric(buf *bufio.Writer, m model.Metric, mf *dto.MetricFamily) error {
  190. metricName, hasName := m[model.MetricNameLabel]
  191. numLabels := len(m) - 1
  192. if !hasName {
  193. numLabels = len(m)
  194. }
  195. for _, v := range metricCategoryPrefix {
  196. if strings.HasPrefix(string(metricName), v) {
  197. group := strings.Replace(v, "_", " ", 1)
  198. metricName = model.LabelValue(strings.Replace(string(metricName), v, group, -1))
  199. }
  200. }
  201. labelStrings := make([]string, 0, numLabels)
  202. for label, value := range m {
  203. if label != model.MetricNameLabel {
  204. labelStrings = append(labelStrings, fmt.Sprintf("%s %s", string(label), string(value)))
  205. }
  206. }
  207. var err error
  208. switch numLabels {
  209. case 0:
  210. if hasName {
  211. if err := writeSanitized(buf, string(metricName)); err != nil {
  212. return err
  213. }
  214. }
  215. default:
  216. sort.Strings(labelStrings)
  217. if err = writeSanitized(buf, string(metricName)); err != nil {
  218. return err
  219. }
  220. for _, s := range labelStrings {
  221. if err = buf.WriteByte('.'); err != nil {
  222. return err
  223. }
  224. if err = writeSanitized(buf, s); err != nil {
  225. return err
  226. }
  227. }
  228. }
  229. if err = addExtentionConventionForRollups(buf, mf, m); err != nil {
  230. return err
  231. }
  232. return nil
  233. }
  234. func addExtentionConventionForRollups(buf *bufio.Writer, mf *dto.MetricFamily, m model.Metric) error {
  235. // Adding `.count` `.sum` suffix makes it possible to configure
  236. // different rollup strategies based on metric type
  237. mfType := mf.GetType()
  238. var err error
  239. if mfType == dto.MetricType_COUNTER {
  240. if _, err = fmt.Fprint(buf, ".count"); err != nil {
  241. return err
  242. }
  243. }
  244. if mfType == dto.MetricType_SUMMARY || mfType == dto.MetricType_HISTOGRAM {
  245. if strings.HasSuffix(string(m[model.MetricNameLabel]), "_count") {
  246. if _, err = fmt.Fprint(buf, ".count"); err != nil {
  247. return err
  248. }
  249. }
  250. }
  251. if mfType == dto.MetricType_HISTOGRAM {
  252. if strings.HasSuffix(string(m[model.MetricNameLabel]), "_sum") {
  253. if _, err = fmt.Fprint(buf, ".sum"); err != nil {
  254. return err
  255. }
  256. }
  257. }
  258. return nil
  259. }
  260. func writePrefix(buf *bufio.Writer, s string) error {
  261. for _, c := range s {
  262. if _, err := buf.WriteRune(replaceInvalid(c)); err != nil {
  263. return err
  264. }
  265. }
  266. return nil
  267. }
  268. func writeSanitized(buf *bufio.Writer, s string) error {
  269. prevUnderscore := false
  270. for _, c := range s {
  271. c = replaceInvalidRune(c)
  272. if c == '_' {
  273. if prevUnderscore {
  274. continue
  275. }
  276. prevUnderscore = true
  277. } else {
  278. prevUnderscore = false
  279. }
  280. if _, err := buf.WriteRune(c); err != nil {
  281. return err
  282. }
  283. }
  284. return nil
  285. }
  286. func replaceInvalid(c rune) rune {
  287. if c == ' ' || c == '.' {
  288. return '.'
  289. }
  290. return replaceInvalidRune(c)
  291. }
  292. func replaceInvalidRune(c rune) rune {
  293. if c == ' ' {
  294. return '.'
  295. }
  296. if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_' || c == ':' || (c >= '0' && c <= '9')) {
  297. return '_'
  298. }
  299. return c
  300. }
  301. func (b *Bridge) replaceCounterWithDelta(mf *dto.MetricFamily, metric model.Metric, value model.SampleValue) float64 {
  302. if !b.countersAsDetlas {
  303. return float64(value)
  304. }
  305. mfType := mf.GetType()
  306. if mfType == dto.MetricType_COUNTER {
  307. return b.returnDelta(metric, value)
  308. }
  309. if mfType == dto.MetricType_SUMMARY {
  310. if strings.HasSuffix(string(metric[model.MetricNameLabel]), "_count") {
  311. return b.returnDelta(metric, value)
  312. }
  313. }
  314. return float64(value)
  315. }
  316. func (b *Bridge) returnDelta(metric model.Metric, value model.SampleValue) float64 {
  317. key := metric.Fingerprint()
  318. _, exists := b.lastValue[key]
  319. if !exists {
  320. b.lastValue[key] = 0
  321. }
  322. delta := float64(value) - b.lastValue[key]
  323. b.lastValue[key] = float64(value)
  324. return delta
  325. }