query.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package conditions
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. gocontext "context"
  7. "github.com/grafana/grafana/pkg/bus"
  8. "github.com/grafana/grafana/pkg/components/null"
  9. "github.com/grafana/grafana/pkg/components/simplejson"
  10. "github.com/grafana/grafana/pkg/models"
  11. "github.com/grafana/grafana/pkg/services/alerting"
  12. "github.com/grafana/grafana/pkg/tsdb"
  13. )
  14. func init() {
  15. alerting.RegisterCondition("query", func(model *simplejson.Json, index int) (alerting.Condition, error) {
  16. return newQueryCondition(model, index)
  17. })
  18. }
  19. // QueryCondition is responsible for issue and query, reduce the
  20. // timeseries into single values and evaluate if they are firing or not.
  21. type QueryCondition struct {
  22. Index int
  23. Query AlertQuery
  24. Reducer *queryReducer
  25. Evaluator AlertEvaluator
  26. Operator string
  27. HandleRequest tsdb.HandleRequestFunc
  28. }
  29. // AlertQuery contains information about what datasource a query
  30. // should be sent to and the query object.
  31. type AlertQuery struct {
  32. Model *simplejson.Json
  33. DatasourceID int64
  34. From string
  35. To string
  36. }
  37. // Eval evaluates the `QueryCondition`.
  38. func (c *QueryCondition) Eval(context *alerting.EvalContext) (*alerting.ConditionResult, error) {
  39. timeRange := tsdb.NewTimeRange(c.Query.From, c.Query.To)
  40. seriesList, err := c.executeQuery(context, timeRange)
  41. if err != nil {
  42. return nil, err
  43. }
  44. emptySerieCount := 0
  45. evalMatchCount := 0
  46. var matches []*alerting.EvalMatch
  47. for _, series := range seriesList {
  48. reducedValue := c.Reducer.Reduce(series)
  49. evalMatch := c.Evaluator.Eval(reducedValue)
  50. if !reducedValue.Valid {
  51. emptySerieCount++
  52. }
  53. if context.IsTestRun {
  54. context.Logs = append(context.Logs, &alerting.ResultLogEntry{
  55. Message: fmt.Sprintf("Condition[%d]: Eval: %v, Metric: %s, Value: %s", c.Index, evalMatch, series.Name, reducedValue),
  56. })
  57. }
  58. if evalMatch {
  59. evalMatchCount++
  60. matches = append(matches, &alerting.EvalMatch{
  61. Metric: series.Name,
  62. Value: reducedValue,
  63. Tags: series.Tags,
  64. })
  65. }
  66. }
  67. // handle no series special case
  68. if len(seriesList) == 0 {
  69. // eval condition for null value
  70. evalMatch := c.Evaluator.Eval(null.FloatFromPtr(nil))
  71. if context.IsTestRun {
  72. context.Logs = append(context.Logs, &alerting.ResultLogEntry{
  73. Message: fmt.Sprintf("Condition: Eval: %v, Query Returned No Series (reduced to null/no value)", evalMatch),
  74. })
  75. }
  76. if evalMatch {
  77. evalMatchCount++
  78. matches = append(matches, &alerting.EvalMatch{Metric: "NoData", Value: null.FloatFromPtr(nil)})
  79. }
  80. }
  81. return &alerting.ConditionResult{
  82. Firing: evalMatchCount > 0,
  83. NoDataFound: emptySerieCount == len(seriesList),
  84. Operator: c.Operator,
  85. EvalMatches: matches,
  86. }, nil
  87. }
  88. func (c *QueryCondition) executeQuery(context *alerting.EvalContext, timeRange *tsdb.TimeRange) (tsdb.TimeSeriesSlice, error) {
  89. getDsInfo := &models.GetDataSourceByIdQuery{
  90. Id: c.Query.DatasourceID,
  91. OrgId: context.Rule.OrgID,
  92. }
  93. if err := bus.Dispatch(getDsInfo); err != nil {
  94. return nil, fmt.Errorf("Could not find datasource %v", err)
  95. }
  96. req := c.getRequestForAlertRule(getDsInfo.Result, timeRange, context.IsDebug)
  97. result := make(tsdb.TimeSeriesSlice, 0)
  98. if context.IsDebug {
  99. data := simplejson.New()
  100. if req.TimeRange != nil {
  101. data.Set("from", req.TimeRange.GetFromAsMsEpoch())
  102. data.Set("to", req.TimeRange.GetToAsMsEpoch())
  103. }
  104. type queryDto struct {
  105. RefId string `json:"refId"`
  106. Model *simplejson.Json `json:"model"`
  107. Datasource *simplejson.Json `json:"datasource"`
  108. MaxDataPoints int64 `json:"maxDataPoints"`
  109. IntervalMs int64 `json:"intervalMs"`
  110. }
  111. queries := []*queryDto{}
  112. for _, q := range req.Queries {
  113. queries = append(queries, &queryDto{
  114. RefId: q.RefId,
  115. Model: q.Model,
  116. Datasource: simplejson.NewFromAny(map[string]interface{}{
  117. "id": q.DataSource.Id,
  118. "name": q.DataSource.Name,
  119. }),
  120. MaxDataPoints: q.MaxDataPoints,
  121. IntervalMs: q.IntervalMs,
  122. })
  123. }
  124. data.Set("queries", queries)
  125. context.Logs = append(context.Logs, &alerting.ResultLogEntry{
  126. Message: fmt.Sprintf("Condition[%d]: Query", c.Index),
  127. Data: data,
  128. })
  129. }
  130. resp, err := c.HandleRequest(context.Ctx, getDsInfo.Result, req)
  131. if err != nil {
  132. if err == gocontext.DeadlineExceeded {
  133. return nil, fmt.Errorf("Alert execution exceeded the timeout")
  134. }
  135. return nil, fmt.Errorf("tsdb.HandleRequest() error %v", err)
  136. }
  137. for _, v := range resp.Results {
  138. if v.Error != nil {
  139. return nil, fmt.Errorf("tsdb.HandleRequest() response error %v", v)
  140. }
  141. result = append(result, v.Series...)
  142. queryResultData := map[string]interface{}{}
  143. if context.IsTestRun {
  144. queryResultData["series"] = v.Series
  145. }
  146. if context.IsDebug && v.Meta != nil {
  147. queryResultData["meta"] = v.Meta
  148. }
  149. if context.IsTestRun || context.IsDebug {
  150. context.Logs = append(context.Logs, &alerting.ResultLogEntry{
  151. Message: fmt.Sprintf("Condition[%d]: Query Result", c.Index),
  152. Data: simplejson.NewFromAny(queryResultData),
  153. })
  154. }
  155. }
  156. return result, nil
  157. }
  158. func (c *QueryCondition) getRequestForAlertRule(datasource *models.DataSource, timeRange *tsdb.TimeRange, debug bool) *tsdb.TsdbQuery {
  159. req := &tsdb.TsdbQuery{
  160. TimeRange: timeRange,
  161. Queries: []*tsdb.Query{
  162. {
  163. RefId: "A",
  164. Model: c.Query.Model,
  165. DataSource: datasource,
  166. },
  167. },
  168. Debug: debug,
  169. }
  170. return req
  171. }
  172. func newQueryCondition(model *simplejson.Json, index int) (*QueryCondition, error) {
  173. condition := QueryCondition{}
  174. condition.Index = index
  175. condition.HandleRequest = tsdb.HandleRequest
  176. queryJSON := model.Get("query")
  177. condition.Query.Model = queryJSON.Get("model")
  178. condition.Query.From = queryJSON.Get("params").MustArray()[1].(string)
  179. condition.Query.To = queryJSON.Get("params").MustArray()[2].(string)
  180. if err := validateFromValue(condition.Query.From); err != nil {
  181. return nil, err
  182. }
  183. if err := validateToValue(condition.Query.To); err != nil {
  184. return nil, err
  185. }
  186. condition.Query.DatasourceID = queryJSON.Get("datasourceId").MustInt64()
  187. reducerJSON := model.Get("reducer")
  188. condition.Reducer = newSimpleReducer(reducerJSON.Get("type").MustString())
  189. evaluatorJSON := model.Get("evaluator")
  190. evaluator, err := NewAlertEvaluator(evaluatorJSON)
  191. if err != nil {
  192. return nil, fmt.Errorf("error in condition %v: %v", index, err)
  193. }
  194. condition.Evaluator = evaluator
  195. operatorJSON := model.Get("operator")
  196. operator := operatorJSON.Get("type").MustString("and")
  197. condition.Operator = operator
  198. return &condition, nil
  199. }
  200. func validateFromValue(from string) error {
  201. fromRaw := strings.Replace(from, "now-", "", 1)
  202. _, err := time.ParseDuration("-" + fromRaw)
  203. return err
  204. }
  205. func validateToValue(to string) error {
  206. if to == "now" {
  207. return nil
  208. } else if strings.HasPrefix(to, "now-") {
  209. withoutNow := strings.Replace(to, "now-", "", 1)
  210. _, err := time.ParseDuration("-" + withoutNow)
  211. if err == nil {
  212. return nil
  213. }
  214. }
  215. _, err := time.ParseDuration(to)
  216. return err
  217. }