executor.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package alerting
  2. import (
  3. "fmt"
  4. "strconv"
  5. "math"
  6. "github.com/grafana/grafana/pkg/bus"
  7. "github.com/grafana/grafana/pkg/log"
  8. m "github.com/grafana/grafana/pkg/models"
  9. "github.com/grafana/grafana/pkg/services/alerting/alertstates"
  10. "github.com/grafana/grafana/pkg/tsdb"
  11. )
  12. var (
  13. resultLogFmt = "Alerting: executor %s %1.2f %s %1.2f : %v"
  14. descriptionFmt = "Actual value: %1.2f for %s"
  15. )
  16. type ExecutorImpl struct {
  17. }
  18. type compareFn func(float64, float64) bool
  19. type aggregationFn func(*tsdb.TimeSeries) float64
  20. var operators = map[string]compareFn{
  21. ">": func(num1, num2 float64) bool { return num1 > num2 },
  22. ">=": func(num1, num2 float64) bool { return num1 >= num2 },
  23. "<": func(num1, num2 float64) bool { return num1 < num2 },
  24. "<=": func(num1, num2 float64) bool { return num1 <= num2 },
  25. "": func(num1, num2 float64) bool { return false },
  26. }
  27. var aggregator = map[string]aggregationFn{
  28. "avg": func(series *tsdb.TimeSeries) float64 {
  29. sum := float64(0)
  30. for _, v := range series.Points {
  31. sum += v[0]
  32. }
  33. return sum / float64(len(series.Points))
  34. },
  35. "sum": func(series *tsdb.TimeSeries) float64 {
  36. sum := float64(0)
  37. for _, v := range series.Points {
  38. sum += v[0]
  39. }
  40. return sum
  41. },
  42. "min": func(series *tsdb.TimeSeries) float64 {
  43. min := series.Points[0][0]
  44. for _, v := range series.Points {
  45. if v[0] < min {
  46. min = v[0]
  47. }
  48. }
  49. return min
  50. },
  51. "max": func(series *tsdb.TimeSeries) float64 {
  52. max := series.Points[0][0]
  53. for _, v := range series.Points {
  54. if v[0] > max {
  55. max = v[0]
  56. }
  57. }
  58. return max
  59. },
  60. "mean": func(series *tsdb.TimeSeries) float64 {
  61. midPosition := int64(math.Floor(float64(len(series.Points)) / float64(2)))
  62. return series.Points[midPosition][0]
  63. },
  64. }
  65. func (e *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) {
  66. timeSeries, err := e.executeQuery(job)
  67. if err != nil {
  68. resultQueue <- &AlertResult{
  69. Error: err,
  70. State: alertstates.Pending,
  71. AlertJob: job,
  72. }
  73. }
  74. result := e.evaluateRule(job.Rule, timeSeries)
  75. result.AlertJob = job
  76. resultQueue <- result
  77. }
  78. func (e *ExecutorImpl) executeQuery(job *AlertJob) (tsdb.TimeSeriesSlice, error) {
  79. getDsInfo := &m.GetDataSourceByIdQuery{
  80. Id: job.Rule.DatasourceId,
  81. OrgId: job.Rule.OrgId,
  82. }
  83. if err := bus.Dispatch(getDsInfo); err != nil {
  84. return nil, fmt.Errorf("Could not find datasource for %d", job.Rule.DatasourceId)
  85. }
  86. req := e.GetRequestForAlertRule(job.Rule, getDsInfo.Result)
  87. result := make(tsdb.TimeSeriesSlice, 0)
  88. resp, err := tsdb.HandleRequest(req)
  89. if err != nil {
  90. return nil, fmt.Errorf("Alerting: GetSeries() tsdb.HandleRequest() error %v", err)
  91. }
  92. for _, v := range resp.Results {
  93. if v.Error != nil {
  94. return nil, fmt.Errorf("Alerting: GetSeries() tsdb.HandleRequest() response error %v", v)
  95. }
  96. result = append(result, v.Series...)
  97. }
  98. return result, nil
  99. }
  100. func (e *ExecutorImpl) GetRequestForAlertRule(rule *AlertRule, datasource *m.DataSource) *tsdb.Request {
  101. req := &tsdb.Request{
  102. TimeRange: tsdb.TimeRange{
  103. From: "-" + strconv.Itoa(rule.QueryRange) + "s",
  104. To: "now",
  105. },
  106. Queries: tsdb.QuerySlice{
  107. &tsdb.Query{
  108. RefId: rule.QueryRefId,
  109. Query: rule.Query,
  110. DataSource: &tsdb.DataSourceInfo{
  111. Id: datasource.Id,
  112. Name: datasource.Name,
  113. PluginId: datasource.Type,
  114. Url: datasource.Url,
  115. },
  116. },
  117. },
  118. }
  119. return req
  120. }
  121. func (e *ExecutorImpl) evaluateRule(rule *AlertRule, series tsdb.TimeSeriesSlice) *AlertResult {
  122. log.Trace("Alerting: executor.evaluateRule: %v, query result: series: %v", rule.Name, len(series))
  123. for _, serie := range series {
  124. log.Info("Alerting: executor.validate: %v", serie.Name)
  125. if aggregator[rule.Aggregator] == nil {
  126. continue
  127. }
  128. var aggValue = aggregator[rule.Aggregator](serie)
  129. var critOperartor = operators[rule.CritOperator]
  130. var critResult = critOperartor(aggValue, rule.CritLevel)
  131. log.Trace(resultLogFmt, "Crit", serie.Name, aggValue, rule.CritOperator, rule.CritLevel, critResult)
  132. if critResult {
  133. return &AlertResult{
  134. State: alertstates.Critical,
  135. ActualValue: aggValue,
  136. Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name),
  137. }
  138. }
  139. var warnOperartor = operators[rule.CritOperator]
  140. var warnResult = warnOperartor(aggValue, rule.CritLevel)
  141. log.Trace(resultLogFmt, "Warn", serie.Name, aggValue, rule.WarnOperator, rule.WarnLevel, warnResult)
  142. if warnResult {
  143. return &AlertResult{
  144. State: alertstates.Warn,
  145. Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name),
  146. ActualValue: aggValue,
  147. }
  148. }
  149. }
  150. return &AlertResult{State: alertstates.Ok, Description: "Alert is OK!"}
  151. }