query.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package conditions
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "github.com/grafana/grafana/pkg/bus"
  7. "github.com/grafana/grafana/pkg/components/simplejson"
  8. m "github.com/grafana/grafana/pkg/models"
  9. "github.com/grafana/grafana/pkg/services/alerting"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. )
  12. func init() {
  13. alerting.RegisterCondition("query", func(model *simplejson.Json, index int) (alerting.Condition, error) {
  14. return NewQueryCondition(model, index)
  15. })
  16. }
  17. type QueryCondition struct {
  18. Index int
  19. Query AlertQuery
  20. Reducer QueryReducer
  21. Evaluator AlertEvaluator
  22. Operator string
  23. HandleRequest tsdb.HandleRequestFunc
  24. }
  25. type AlertQuery struct {
  26. Model *simplejson.Json
  27. DatasourceId int64
  28. From string
  29. To string
  30. }
  31. func (c *QueryCondition) GetDatsourceId() (datasourceId *int64, exist bool) {
  32. return &c.Query.DatasourceId, true
  33. }
  34. func (c *QueryCondition) Eval(context *alerting.EvalContext) (*alerting.ConditionResult, error) {
  35. timeRange := tsdb.NewTimeRange(c.Query.From, c.Query.To)
  36. seriesList, err := c.executeQuery(context, timeRange)
  37. if err != nil {
  38. return nil, err
  39. }
  40. emptySerieCount := 0
  41. evalMatchCount := 0
  42. var matches []*alerting.EvalMatch
  43. for _, series := range seriesList {
  44. reducedValue := c.Reducer.Reduce(series)
  45. evalMatch := c.Evaluator.Eval(reducedValue)
  46. if reducedValue.Valid == false {
  47. emptySerieCount++
  48. continue
  49. }
  50. if context.IsTestRun {
  51. context.Logs = append(context.Logs, &alerting.ResultLogEntry{
  52. Message: fmt.Sprintf("Condition[%d]: Eval: %v, Metric: %s, Value: %1.3f", c.Index, evalMatch, series.Name, reducedValue.Float64),
  53. })
  54. }
  55. if evalMatch {
  56. evalMatchCount++
  57. matches = append(matches, &alerting.EvalMatch{
  58. Metric: series.Name,
  59. Value: reducedValue.Float64,
  60. })
  61. }
  62. }
  63. return &alerting.ConditionResult{
  64. Firing: evalMatchCount > 0,
  65. NoDataFound: emptySerieCount == len(seriesList),
  66. Operator: c.Operator,
  67. EvalMatches: matches,
  68. }, nil
  69. }
  70. func (c *QueryCondition) executeQuery(context *alerting.EvalContext, timeRange *tsdb.TimeRange) (tsdb.TimeSeriesSlice, error) {
  71. getDsInfo := &m.GetDataSourceByIdQuery{
  72. Id: c.Query.DatasourceId,
  73. OrgId: context.Rule.OrgId,
  74. }
  75. if err := bus.Dispatch(getDsInfo); err != nil {
  76. return nil, fmt.Errorf("Could not find datasource")
  77. }
  78. req := c.getRequestForAlertRule(getDsInfo.Result, timeRange)
  79. result := make(tsdb.TimeSeriesSlice, 0)
  80. resp, err := c.HandleRequest(context.Ctx, req)
  81. if err != nil {
  82. return nil, fmt.Errorf("tsdb.HandleRequest() error %v", err)
  83. }
  84. for _, v := range resp.Results {
  85. if v.Error != nil {
  86. return nil, fmt.Errorf("tsdb.HandleRequest() response error %v", v)
  87. }
  88. result = append(result, v.Series...)
  89. if context.IsTestRun {
  90. context.Logs = append(context.Logs, &alerting.ResultLogEntry{
  91. Message: fmt.Sprintf("Condition[%d]: Query Result", c.Index),
  92. Data: v.Series,
  93. })
  94. }
  95. }
  96. return result, nil
  97. }
  98. func (c *QueryCondition) getRequestForAlertRule(datasource *m.DataSource, timeRange *tsdb.TimeRange) *tsdb.Request {
  99. req := &tsdb.Request{
  100. TimeRange: timeRange,
  101. Queries: []*tsdb.Query{
  102. {
  103. RefId: "A",
  104. Model: c.Query.Model,
  105. DataSource: datasource,
  106. },
  107. },
  108. }
  109. return req
  110. }
  111. func NewQueryCondition(model *simplejson.Json, index int) (*QueryCondition, error) {
  112. condition := QueryCondition{}
  113. condition.Index = index
  114. condition.HandleRequest = tsdb.HandleRequest
  115. queryJson := model.Get("query")
  116. condition.Query.Model = queryJson.Get("model")
  117. condition.Query.From = queryJson.Get("params").MustArray()[1].(string)
  118. condition.Query.To = queryJson.Get("params").MustArray()[2].(string)
  119. if err := validateFromValue(condition.Query.From); err != nil {
  120. return nil, err
  121. }
  122. if err := validateToValue(condition.Query.To); err != nil {
  123. return nil, err
  124. }
  125. condition.Query.DatasourceId = queryJson.Get("datasourceId").MustInt64()
  126. reducerJson := model.Get("reducer")
  127. condition.Reducer = NewSimpleReducer(reducerJson.Get("type").MustString())
  128. evaluatorJson := model.Get("evaluator")
  129. evaluator, err := NewAlertEvaluator(evaluatorJson)
  130. if err != nil {
  131. return nil, err
  132. }
  133. condition.Evaluator = evaluator
  134. operatorJson := model.Get("operator")
  135. operator := operatorJson.Get("type").MustString("and")
  136. condition.Operator = operator
  137. return &condition, nil
  138. }
  139. func validateFromValue(from string) error {
  140. fromRaw := strings.Replace(from, "now-", "", 1)
  141. _, err := time.ParseDuration("-" + fromRaw)
  142. return err
  143. }
  144. func validateToValue(to string) error {
  145. if to == "now" {
  146. return nil
  147. } else if strings.HasPrefix(to, "now-") {
  148. withoutNow := strings.Replace(to, "now-", "", 1)
  149. _, err := time.ParseDuration("-" + withoutNow)
  150. if err == nil {
  151. return nil
  152. }
  153. }
  154. _, err := time.ParseDuration(to)
  155. return err
  156. }