graphite.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. // Copyright 2016 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. // Package graphite provides a bridge to push Prometheus metrics to a Graphite
  14. // server.
  15. package graphitebridge
  16. import (
  17. "bufio"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "math"
  22. "net"
  23. "sort"
  24. "strings"
  25. "time"
  26. "context"
  27. "github.com/prometheus/common/expfmt"
  28. "github.com/prometheus/common/model"
  29. dto "github.com/prometheus/client_model/go"
  30. "github.com/prometheus/client_golang/prometheus"
  31. )
  32. const (
  33. defaultInterval = 15 * time.Second
  34. millisecondsPerSecond = 1000
  35. )
  36. // HandlerErrorHandling defines how a Handler serving metrics will handle
  37. // errors.
  38. type HandlerErrorHandling int
  39. // These constants cause handlers serving metrics to behave as described if
  40. // errors are encountered.
  41. const (
  42. // Ignore errors and try to push as many metrics to Graphite as possible.
  43. ContinueOnError HandlerErrorHandling = iota
  44. // Abort the push to Graphite upon the first error encountered.
  45. AbortOnError
  46. )
  47. var metricCategoryPrefix []string = []string{
  48. "proxy_",
  49. "api_",
  50. "page_",
  51. "alerting_",
  52. "aws_",
  53. "db_",
  54. "stat_",
  55. "go_",
  56. "process_"}
  57. var trimMetricPrefix []string = []string{"grafana_"}
  58. // Config defines the Graphite bridge config.
  59. type Config struct {
  60. // The url to push data to. Required.
  61. URL string
  62. // The prefix for the pushed Graphite metrics. Defaults to empty string.
  63. Prefix string
  64. // The interval to use for pushing data to Graphite. Defaults to 15 seconds.
  65. Interval time.Duration
  66. // The timeout for pushing metrics to Graphite. Defaults to 15 seconds.
  67. Timeout time.Duration
  68. // The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer.
  69. Gatherer prometheus.Gatherer
  70. // The logger that messages are written to. Defaults to no logging.
  71. Logger Logger
  72. // ErrorHandling defines how errors are handled. Note that errors are
  73. // logged regardless of the configured ErrorHandling provided Logger
  74. // is not nil.
  75. ErrorHandling HandlerErrorHandling
  76. // Graphite does not support ever increasing counter the same way
  77. // prometheus does. Rollups and ingestion might cannot handle ever
  78. // increasing counters. This option allows enabled the caller to
  79. // calculate the delta by saving the last sent counter in memory
  80. // and subtraction it from the collected value before sending.
  81. CountersAsDelta bool
  82. }
  83. // Bridge pushes metrics to the configured Graphite server.
  84. type Bridge struct {
  85. url string
  86. prefix string
  87. countersAsDetlas bool
  88. interval time.Duration
  89. timeout time.Duration
  90. errorHandling HandlerErrorHandling
  91. logger Logger
  92. g prometheus.Gatherer
  93. lastValue map[model.Fingerprint]float64
  94. }
  95. // Logger is the minimal interface Bridge needs for logging. Note that
  96. // log.Logger from the standard library implements this interface, and it is
  97. // easy to implement by custom loggers, if they don't do so already anyway.
  98. type Logger interface {
  99. Println(v ...interface{})
  100. }
  101. // NewBridge returns a pointer to a new Bridge struct.
  102. func NewBridge(c *Config) (*Bridge, error) {
  103. b := &Bridge{}
  104. if c.URL == "" {
  105. return nil, errors.New("missing URL")
  106. }
  107. b.url = c.URL
  108. if c.Gatherer == nil {
  109. b.g = prometheus.DefaultGatherer
  110. } else {
  111. b.g = c.Gatherer
  112. }
  113. if c.Logger != nil {
  114. b.logger = c.Logger
  115. }
  116. if c.Prefix != "" {
  117. b.prefix = c.Prefix
  118. }
  119. var z time.Duration
  120. if c.Interval == z {
  121. b.interval = defaultInterval
  122. } else {
  123. b.interval = c.Interval
  124. }
  125. if c.Timeout == z {
  126. b.timeout = defaultInterval
  127. } else {
  128. b.timeout = c.Timeout
  129. }
  130. b.errorHandling = c.ErrorHandling
  131. b.lastValue = map[model.Fingerprint]float64{}
  132. b.countersAsDetlas = c.CountersAsDelta
  133. return b, nil
  134. }
  135. // Run starts the event loop that pushes Prometheus metrics to Graphite at the
  136. // configured interval.
  137. func (b *Bridge) Run(ctx context.Context) {
  138. ticker := time.NewTicker(b.interval)
  139. defer ticker.Stop()
  140. for {
  141. select {
  142. case <-ticker.C:
  143. if err := b.Push(); err != nil && b.logger != nil {
  144. b.logger.Println("error pushing to Graphite:", err)
  145. }
  146. case <-ctx.Done():
  147. return
  148. }
  149. }
  150. }
  151. // Push pushes Prometheus metrics to the configured Graphite server.
  152. func (b *Bridge) Push() error {
  153. mfs, err := b.g.Gather()
  154. if err != nil || len(mfs) == 0 {
  155. switch b.errorHandling {
  156. case AbortOnError:
  157. return err
  158. case ContinueOnError:
  159. if b.logger != nil {
  160. b.logger.Println("continue on error:", err)
  161. }
  162. default:
  163. panic("unrecognized error handling value")
  164. }
  165. }
  166. conn, err := net.DialTimeout("tcp", b.url, b.timeout)
  167. if err != nil {
  168. return err
  169. }
  170. defer conn.Close()
  171. return b.writeMetrics(conn, mfs, b.prefix, model.Now())
  172. }
  173. func (b *Bridge) writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model.Time) error {
  174. for _, mf := range mfs {
  175. vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{
  176. Timestamp: now,
  177. }, mf)
  178. if err != nil {
  179. return err
  180. }
  181. buf := bufio.NewWriter(w)
  182. for _, s := range vec {
  183. if math.IsNaN(float64(s.Value)) {
  184. continue
  185. }
  186. if err := writePrefix(buf, prefix); err != nil {
  187. return err
  188. }
  189. if err := writeMetric(buf, s.Metric, mf); err != nil {
  190. return err
  191. }
  192. value := b.replaceCounterWithDelta(mf, s.Metric, s.Value)
  193. if _, err := fmt.Fprintf(buf, " %g %d\n", value, int64(s.Timestamp)/millisecondsPerSecond); err != nil {
  194. return err
  195. }
  196. if err := buf.Flush(); err != nil {
  197. return err
  198. }
  199. }
  200. }
  201. return nil
  202. }
  203. func writeMetric(buf *bufio.Writer, m model.Metric, mf *dto.MetricFamily) error {
  204. metricName, hasName := m[model.MetricNameLabel]
  205. numLabels := len(m) - 1
  206. if !hasName {
  207. numLabels = len(m)
  208. }
  209. for _, v := range trimMetricPrefix {
  210. if strings.HasPrefix(string(metricName), v) {
  211. metricName = model.LabelValue(strings.Replace(string(metricName), v, "", 1))
  212. }
  213. }
  214. for _, v := range metricCategoryPrefix {
  215. if strings.HasPrefix(string(metricName), v) {
  216. group := strings.Replace(v, "_", " ", 1)
  217. metricName = model.LabelValue(strings.Replace(string(metricName), v, group, 1))
  218. }
  219. }
  220. labelStrings := make([]string, 0, numLabels)
  221. for label, value := range m {
  222. if label != model.MetricNameLabel {
  223. labelStrings = append(labelStrings, fmt.Sprintf("%s %s", string(label), string(value)))
  224. }
  225. }
  226. var err error
  227. switch numLabels {
  228. case 0:
  229. if hasName {
  230. if err := writeSanitized(buf, string(metricName)); err != nil {
  231. return err
  232. }
  233. }
  234. default:
  235. sort.Strings(labelStrings)
  236. if err = writeSanitized(buf, string(metricName)); err != nil {
  237. return err
  238. }
  239. for _, s := range labelStrings {
  240. if err = buf.WriteByte('.'); err != nil {
  241. return err
  242. }
  243. if err = writeSanitized(buf, s); err != nil {
  244. return err
  245. }
  246. }
  247. }
  248. return addExtentionConventionForRollups(buf, mf, m)
  249. }
  250. func addExtentionConventionForRollups(buf *bufio.Writer, mf *dto.MetricFamily, m model.Metric) error {
  251. // Adding `.count` `.sum` suffix makes it possible to configure
  252. // different rollup strategies based on metric type
  253. mfType := mf.GetType()
  254. var err error
  255. if mfType == dto.MetricType_COUNTER {
  256. if _, err = fmt.Fprint(buf, ".count"); err != nil {
  257. return err
  258. }
  259. }
  260. if mfType == dto.MetricType_SUMMARY || mfType == dto.MetricType_HISTOGRAM {
  261. if strings.HasSuffix(string(m[model.MetricNameLabel]), "_count") {
  262. if _, err = fmt.Fprint(buf, ".count"); err != nil {
  263. return err
  264. }
  265. }
  266. }
  267. if mfType == dto.MetricType_HISTOGRAM {
  268. if strings.HasSuffix(string(m[model.MetricNameLabel]), "_sum") {
  269. if _, err = fmt.Fprint(buf, ".sum"); err != nil {
  270. return err
  271. }
  272. }
  273. }
  274. return nil
  275. }
  276. func writePrefix(buf *bufio.Writer, s string) error {
  277. for _, c := range s {
  278. if _, err := buf.WriteRune(replaceInvalid(c)); err != nil {
  279. return err
  280. }
  281. }
  282. return nil
  283. }
  284. func writeSanitized(buf *bufio.Writer, s string) error {
  285. prevUnderscore := false
  286. for _, c := range s {
  287. c = replaceInvalidRune(c)
  288. if c == '_' {
  289. if prevUnderscore {
  290. continue
  291. }
  292. prevUnderscore = true
  293. } else {
  294. prevUnderscore = false
  295. }
  296. if _, err := buf.WriteRune(c); err != nil {
  297. return err
  298. }
  299. }
  300. return nil
  301. }
  302. func replaceInvalid(c rune) rune {
  303. if c == ' ' || c == '.' {
  304. return '.'
  305. }
  306. return replaceInvalidRune(c)
  307. }
  308. func replaceInvalidRune(c rune) rune {
  309. if c == ' ' {
  310. return '.'
  311. }
  312. if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '-' || c == '_' || c == ':' || (c >= '0' && c <= '9')) {
  313. return '_'
  314. }
  315. return c
  316. }
  317. func (b *Bridge) replaceCounterWithDelta(mf *dto.MetricFamily, metric model.Metric, value model.SampleValue) float64 {
  318. if !b.countersAsDetlas {
  319. return float64(value)
  320. }
  321. mfType := mf.GetType()
  322. if mfType == dto.MetricType_COUNTER {
  323. return b.returnDelta(metric, value)
  324. }
  325. if mfType == dto.MetricType_SUMMARY {
  326. if strings.HasSuffix(string(metric[model.MetricNameLabel]), "_count") {
  327. return b.returnDelta(metric, value)
  328. }
  329. }
  330. return float64(value)
  331. }
  332. func (b *Bridge) returnDelta(metric model.Metric, value model.SampleValue) float64 {
  333. key := metric.Fingerprint()
  334. _, exists := b.lastValue[key]
  335. if !exists {
  336. b.lastValue[key] = 0
  337. }
  338. delta := float64(value) - b.lastValue[key]
  339. b.lastValue[key] = float64(value)
  340. return delta
  341. }