prometheus.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package prometheus
  2. import (
  3. "context"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "time"
  8. "net/http"
  9. "github.com/grafana/grafana/pkg/components/null"
  10. "github.com/grafana/grafana/pkg/log"
  11. "github.com/grafana/grafana/pkg/models"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. "github.com/prometheus/client_golang/api/prometheus"
  14. pmodel "github.com/prometheus/common/model"
  15. )
  16. type PrometheusExecutor struct {
  17. *models.DataSource
  18. Transport *http.Transport
  19. }
  20. type basicAuthTransport struct {
  21. *http.Transport
  22. username string
  23. password string
  24. }
  25. func (bat basicAuthTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  26. req.SetBasicAuth(bat.username, bat.password)
  27. return bat.Transport.RoundTrip(req)
  28. }
  29. func NewPrometheusExecutor(dsInfo *models.DataSource) (tsdb.Executor, error) {
  30. transport, err := dsInfo.GetHttpTransport()
  31. if err != nil {
  32. return nil, err
  33. }
  34. return &PrometheusExecutor{
  35. DataSource: dsInfo,
  36. Transport: transport,
  37. }, nil
  38. }
  39. var (
  40. plog log.Logger
  41. legendFormat *regexp.Regexp
  42. )
  43. func init() {
  44. plog = log.New("tsdb.prometheus")
  45. tsdb.RegisterExecutor("prometheus", NewPrometheusExecutor)
  46. legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
  47. }
  48. func (e *PrometheusExecutor) getClient() (prometheus.QueryAPI, error) {
  49. cfg := prometheus.Config{
  50. Address: e.DataSource.Url,
  51. Transport: e.Transport,
  52. }
  53. if e.BasicAuth {
  54. cfg.Transport = basicAuthTransport{
  55. Transport: e.Transport,
  56. username: e.BasicAuthUser,
  57. password: e.BasicAuthPassword,
  58. }
  59. }
  60. client, err := prometheus.New(cfg)
  61. if err != nil {
  62. return nil, err
  63. }
  64. return prometheus.NewQueryAPI(client), nil
  65. }
  66. func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
  67. result := &tsdb.BatchResult{}
  68. client, err := e.getClient()
  69. if err != nil {
  70. return result.WithError(err)
  71. }
  72. query, err := parseQuery(queries, queryContext)
  73. if err != nil {
  74. return result.WithError(err)
  75. }
  76. timeRange := prometheus.Range{
  77. Start: query.Start,
  78. End: query.End,
  79. Step: query.Step,
  80. }
  81. value, err := client.QueryRange(ctx, query.Expr, timeRange)
  82. if err != nil {
  83. return result.WithError(err)
  84. }
  85. queryResult, err := parseResponse(value, query)
  86. if err != nil {
  87. return result.WithError(err)
  88. }
  89. result.QueryResults = queryResult
  90. return result
  91. }
  92. func formatLegend(metric pmodel.Metric, query *PrometheusQuery) string {
  93. if query.LegendFormat == "" {
  94. return metric.String()
  95. }
  96. result := legendFormat.ReplaceAllFunc([]byte(query.LegendFormat), func(in []byte) []byte {
  97. labelName := strings.Replace(string(in), "{{", "", 1)
  98. labelName = strings.Replace(labelName, "}}", "", 1)
  99. labelName = strings.TrimSpace(labelName)
  100. if val, exists := metric[pmodel.LabelName(labelName)]; exists {
  101. return []byte(val)
  102. }
  103. return in
  104. })
  105. return string(result)
  106. }
  107. func parseQuery(queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) (*PrometheusQuery, error) {
  108. queryModel := queries[0]
  109. expr, err := queryModel.Model.Get("expr").String()
  110. if err != nil {
  111. return nil, err
  112. }
  113. step, err := queryModel.Model.Get("step").Int64()
  114. if err != nil {
  115. return nil, err
  116. }
  117. format := queryModel.Model.Get("legendFormat").MustString("")
  118. start, err := queryContext.TimeRange.ParseFrom()
  119. if err != nil {
  120. return nil, err
  121. }
  122. end, err := queryContext.TimeRange.ParseTo()
  123. if err != nil {
  124. return nil, err
  125. }
  126. return &PrometheusQuery{
  127. Expr: expr,
  128. Step: time.Second * time.Duration(step),
  129. LegendFormat: format,
  130. Start: start,
  131. End: end,
  132. }, nil
  133. }
  134. func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb.QueryResult, error) {
  135. queryResults := make(map[string]*tsdb.QueryResult)
  136. queryRes := tsdb.NewQueryResult()
  137. data, ok := value.(pmodel.Matrix)
  138. if !ok {
  139. return queryResults, fmt.Errorf("Unsupported result format: %s", value.Type().String())
  140. }
  141. for _, v := range data {
  142. series := tsdb.TimeSeries{
  143. Name: formatLegend(v.Metric, query),
  144. Tags: map[string]string{},
  145. }
  146. for k, v := range v.Metric {
  147. series.Tags[string(k)] = string(v)
  148. }
  149. for _, k := range v.Values {
  150. series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(float64(k.Value)), float64(k.Timestamp.Unix()*1000)))
  151. }
  152. queryRes.Series = append(queryRes.Series, &series)
  153. }
  154. queryResults["A"] = queryRes
  155. return queryResults, nil
  156. }