query.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package conditions
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "github.com/grafana/grafana/pkg/bus"
  7. "github.com/grafana/grafana/pkg/components/simplejson"
  8. m "github.com/grafana/grafana/pkg/models"
  9. "github.com/grafana/grafana/pkg/services/alerting"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. )
  12. func init() {
  13. alerting.RegisterCondition("query", func(model *simplejson.Json, index int) (alerting.Condition, error) {
  14. return NewQueryCondition(model, index)
  15. })
  16. }
  17. type QueryCondition struct {
  18. Index int
  19. Query AlertQuery
  20. Reducer QueryReducer
  21. Evaluator AlertEvaluator
  22. HandleRequest tsdb.HandleRequestFunc
  23. }
  24. type AlertQuery struct {
  25. Model *simplejson.Json
  26. DatasourceId int64
  27. From string
  28. To string
  29. }
  30. func (c *QueryCondition) Eval(context *alerting.EvalContext) {
  31. timeRange := tsdb.NewTimeRange(c.Query.From, c.Query.To)
  32. seriesList, err := c.executeQuery(context, timeRange)
  33. if err != nil {
  34. context.Error = err
  35. return
  36. }
  37. emptySerieCount := 0
  38. evalMatchCount := 0
  39. for _, series := range seriesList {
  40. reducedValue := c.Reducer.Reduce(series)
  41. evalMatch := c.Evaluator.Eval(reducedValue)
  42. if reducedValue.Valid == false {
  43. emptySerieCount++
  44. continue
  45. }
  46. if context.IsTestRun {
  47. context.Logs = append(context.Logs, &alerting.ResultLogEntry{
  48. Message: fmt.Sprintf("Condition[%d]: Eval: %v, Metric: %s, Value: %1.3f", c.Index, evalMatch, series.Name, reducedValue.Float64),
  49. })
  50. }
  51. if evalMatch {
  52. evalMatchCount++
  53. context.EvalMatches = append(context.EvalMatches, &alerting.EvalMatch{
  54. Metric: series.Name,
  55. Value: reducedValue.Float64,
  56. })
  57. }
  58. }
  59. context.NoDataFound = emptySerieCount == len(seriesList)
  60. context.Firing = evalMatchCount > 0
  61. }
  62. func (c *QueryCondition) executeQuery(context *alerting.EvalContext, timeRange *tsdb.TimeRange) (tsdb.TimeSeriesSlice, error) {
  63. getDsInfo := &m.GetDataSourceByIdQuery{
  64. Id: c.Query.DatasourceId,
  65. OrgId: context.Rule.OrgId,
  66. }
  67. if err := bus.Dispatch(getDsInfo); err != nil {
  68. return nil, fmt.Errorf("Could not find datasource")
  69. }
  70. req := c.getRequestForAlertRule(getDsInfo.Result, timeRange)
  71. result := make(tsdb.TimeSeriesSlice, 0)
  72. resp, err := c.HandleRequest(context.Ctx, req)
  73. if err != nil {
  74. return nil, fmt.Errorf("tsdb.HandleRequest() error %v", err)
  75. }
  76. for _, v := range resp.Results {
  77. if v.Error != nil {
  78. return nil, fmt.Errorf("tsdb.HandleRequest() response error %v", v)
  79. }
  80. result = append(result, v.Series...)
  81. if context.IsTestRun {
  82. context.Logs = append(context.Logs, &alerting.ResultLogEntry{
  83. Message: fmt.Sprintf("Condition[%d]: Query Result", c.Index),
  84. Data: v.Series,
  85. })
  86. }
  87. }
  88. return result, nil
  89. }
  90. func (c *QueryCondition) getRequestForAlertRule(datasource *m.DataSource, timeRange *tsdb.TimeRange) *tsdb.Request {
  91. req := &tsdb.Request{
  92. TimeRange: timeRange,
  93. Queries: []*tsdb.Query{
  94. {
  95. RefId: "A",
  96. Model: c.Query.Model,
  97. DataSource: &tsdb.DataSourceInfo{
  98. Id: datasource.Id,
  99. Name: datasource.Name,
  100. PluginId: datasource.Type,
  101. Url: datasource.Url,
  102. User: datasource.User,
  103. Password: datasource.Password,
  104. Database: datasource.Database,
  105. BasicAuth: datasource.BasicAuth,
  106. BasicAuthUser: datasource.BasicAuthUser,
  107. BasicAuthPassword: datasource.BasicAuthPassword,
  108. JsonData: datasource.JsonData,
  109. },
  110. },
  111. },
  112. }
  113. return req
  114. }
  115. func NewQueryCondition(model *simplejson.Json, index int) (*QueryCondition, error) {
  116. condition := QueryCondition{}
  117. condition.Index = index
  118. condition.HandleRequest = tsdb.HandleRequest
  119. queryJson := model.Get("query")
  120. condition.Query.Model = queryJson.Get("model")
  121. condition.Query.From = queryJson.Get("params").MustArray()[1].(string)
  122. condition.Query.To = queryJson.Get("params").MustArray()[2].(string)
  123. if err := validateFromValue(condition.Query.From); err != nil {
  124. return nil, err
  125. }
  126. if err := validateToValue(condition.Query.To); err != nil {
  127. return nil, err
  128. }
  129. condition.Query.DatasourceId = queryJson.Get("datasourceId").MustInt64()
  130. reducerJson := model.Get("reducer")
  131. condition.Reducer = NewSimpleReducer(reducerJson.Get("type").MustString())
  132. evaluatorJson := model.Get("evaluator")
  133. evaluator, err := NewAlertEvaluator(evaluatorJson)
  134. if err != nil {
  135. return nil, err
  136. }
  137. condition.Evaluator = evaluator
  138. return &condition, nil
  139. }
  140. func validateFromValue(from string) error {
  141. fromRaw := strings.Replace(from, "now-", "", 1)
  142. _, err := time.ParseDuration("-" + fromRaw)
  143. return err
  144. }
  145. func validateToValue(to string) error {
  146. if to == "now" {
  147. return nil
  148. } else if strings.HasPrefix(to, "now-") {
  149. withoutNow := strings.Replace(to, "now-", "", 1)
  150. _, err := time.ParseDuration("-" + withoutNow)
  151. if err == nil {
  152. return nil
  153. }
  154. }
  155. _, err := time.ParseDuration(to)
  156. return err
  157. }