prometheus.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package prometheus
  2. import (
  3. "context"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "time"
  8. "net/http"
  9. "github.com/grafana/grafana/pkg/components/null"
  10. "github.com/grafana/grafana/pkg/log"
  11. "github.com/grafana/grafana/pkg/models"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. api "github.com/prometheus/client_golang/api"
  14. apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
  15. "github.com/prometheus/common/model"
  16. //api "github.com/prometheus/client_golang/api"
  17. //apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
  18. )
  19. type PrometheusExecutor struct {
  20. *models.DataSource
  21. Transport *http.Transport
  22. }
  23. type basicAuthTransport struct {
  24. *http.Transport
  25. username string
  26. password string
  27. }
  28. func (bat basicAuthTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  29. req.SetBasicAuth(bat.username, bat.password)
  30. return bat.Transport.RoundTrip(req)
  31. }
  32. func NewPrometheusExecutor(dsInfo *models.DataSource) (tsdb.Executor, error) {
  33. transport, err := dsInfo.GetHttpTransport()
  34. if err != nil {
  35. return nil, err
  36. }
  37. return &PrometheusExecutor{
  38. DataSource: dsInfo,
  39. Transport: transport,
  40. }, nil
  41. }
  42. var (
  43. plog log.Logger
  44. legendFormat *regexp.Regexp
  45. )
  46. func init() {
  47. plog = log.New("tsdb.prometheus")
  48. tsdb.RegisterExecutor("prometheus", NewPrometheusExecutor)
  49. legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
  50. }
  51. func (e *PrometheusExecutor) getClient() (apiv1.API, error) {
  52. cfg := api.Config{
  53. Address: e.DataSource.Url,
  54. RoundTripper: e.Transport,
  55. }
  56. if e.BasicAuth {
  57. cfg.RoundTripper = basicAuthTransport{
  58. Transport: e.Transport,
  59. username: e.BasicAuthUser,
  60. password: e.BasicAuthPassword,
  61. }
  62. }
  63. client, err := api.NewClient(cfg)
  64. if err != nil {
  65. return nil, err
  66. }
  67. return apiv1.NewAPI(client), nil
  68. }
  69. func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
  70. result := &tsdb.BatchResult{}
  71. client, err := e.getClient()
  72. if err != nil {
  73. return result.WithError(err)
  74. }
  75. query, err := parseQuery(queries, queryContext)
  76. if err != nil {
  77. return result.WithError(err)
  78. }
  79. timeRange := apiv1.Range{
  80. Start: query.Start,
  81. End: query.End,
  82. Step: query.Step,
  83. }
  84. value, err := client.QueryRange(ctx, query.Expr, timeRange)
  85. if err != nil {
  86. return result.WithError(err)
  87. }
  88. queryResult, err := parseResponse(value, query)
  89. if err != nil {
  90. return result.WithError(err)
  91. }
  92. result.QueryResults = queryResult
  93. return result
  94. }
  95. func formatLegend(metric model.Metric, query *PrometheusQuery) string {
  96. if query.LegendFormat == "" {
  97. return metric.String()
  98. }
  99. result := legendFormat.ReplaceAllFunc([]byte(query.LegendFormat), func(in []byte) []byte {
  100. labelName := strings.Replace(string(in), "{{", "", 1)
  101. labelName = strings.Replace(labelName, "}}", "", 1)
  102. labelName = strings.TrimSpace(labelName)
  103. if val, exists := metric[model.LabelName(labelName)]; exists {
  104. return []byte(val)
  105. }
  106. return in
  107. })
  108. return string(result)
  109. }
  110. func parseQuery(queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) (*PrometheusQuery, error) {
  111. queryModel := queries[0]
  112. expr, err := queryModel.Model.Get("expr").String()
  113. if err != nil {
  114. return nil, err
  115. }
  116. step, err := queryModel.Model.Get("step").Int64()
  117. if err != nil {
  118. return nil, err
  119. }
  120. format := queryModel.Model.Get("legendFormat").MustString("")
  121. start, err := queryContext.TimeRange.ParseFrom()
  122. if err != nil {
  123. return nil, err
  124. }
  125. end, err := queryContext.TimeRange.ParseTo()
  126. if err != nil {
  127. return nil, err
  128. }
  129. return &PrometheusQuery{
  130. Expr: expr,
  131. Step: time.Second * time.Duration(step),
  132. LegendFormat: format,
  133. Start: start,
  134. End: end,
  135. }, nil
  136. }
  137. func parseResponse(value model.Value, query *PrometheusQuery) (map[string]*tsdb.QueryResult, error) {
  138. queryResults := make(map[string]*tsdb.QueryResult)
  139. queryRes := tsdb.NewQueryResult()
  140. data, ok := value.(model.Matrix)
  141. if !ok {
  142. return queryResults, fmt.Errorf("Unsupported result format: %s", value.Type().String())
  143. }
  144. for _, v := range data {
  145. series := tsdb.TimeSeries{
  146. Name: formatLegend(v.Metric, query),
  147. Tags: map[string]string{},
  148. }
  149. for k, v := range v.Metric {
  150. series.Tags[string(k)] = string(v)
  151. }
  152. for _, k := range v.Values {
  153. series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(float64(k.Value)), float64(k.Timestamp.Unix()*1000)))
  154. }
  155. queryRes.Series = append(queryRes.Series, &series)
  156. }
  157. queryResults["A"] = queryRes
  158. return queryResults, nil
  159. }