executor.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package alerting
  2. import (
  3. "fmt"
  4. "math"
  5. "github.com/grafana/grafana/pkg/bus"
  6. "github.com/grafana/grafana/pkg/log"
  7. m "github.com/grafana/grafana/pkg/models"
  8. "github.com/grafana/grafana/pkg/services/alerting/alertstates"
  9. "github.com/grafana/grafana/pkg/tsdb"
  10. )
  11. var (
  12. descriptionFmt = "Actual value: %1.2f for %s"
  13. )
  14. type ExecutorImpl struct {
  15. log log.Logger
  16. }
  17. func NewExecutor() *ExecutorImpl {
  18. return &ExecutorImpl{
  19. log: log.New("alerting.executor"),
  20. }
  21. }
  22. type compareFn func(float64, float64) bool
  23. type aggregationFn func(*tsdb.TimeSeries) float64
  24. var operators = map[string]compareFn{
  25. ">": func(num1, num2 float64) bool { return num1 > num2 },
  26. ">=": func(num1, num2 float64) bool { return num1 >= num2 },
  27. "<": func(num1, num2 float64) bool { return num1 < num2 },
  28. "<=": func(num1, num2 float64) bool { return num1 <= num2 },
  29. "": func(num1, num2 float64) bool { return false },
  30. }
  31. var aggregator = map[string]aggregationFn{
  32. "avg": func(series *tsdb.TimeSeries) float64 {
  33. sum := float64(0)
  34. for _, v := range series.Points {
  35. sum += v[0]
  36. }
  37. return sum / float64(len(series.Points))
  38. },
  39. "sum": func(series *tsdb.TimeSeries) float64 {
  40. sum := float64(0)
  41. for _, v := range series.Points {
  42. sum += v[0]
  43. }
  44. return sum
  45. },
  46. "min": func(series *tsdb.TimeSeries) float64 {
  47. min := series.Points[0][0]
  48. for _, v := range series.Points {
  49. if v[0] < min {
  50. min = v[0]
  51. }
  52. }
  53. return min
  54. },
  55. "max": func(series *tsdb.TimeSeries) float64 {
  56. max := series.Points[0][0]
  57. for _, v := range series.Points {
  58. if v[0] > max {
  59. max = v[0]
  60. }
  61. }
  62. return max
  63. },
  64. "mean": func(series *tsdb.TimeSeries) float64 {
  65. midPosition := int64(math.Floor(float64(len(series.Points)) / float64(2)))
  66. return series.Points[midPosition][0]
  67. },
  68. }
  69. func (e *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) {
  70. timeSeries, err := e.executeQuery(job)
  71. if err != nil {
  72. resultQueue <- &AlertResult{
  73. Error: err,
  74. State: alertstates.Pending,
  75. AlertJob: job,
  76. }
  77. }
  78. result := e.evaluateRule(job.Rule, timeSeries)
  79. result.AlertJob = job
  80. resultQueue <- result
  81. }
  82. func (e *ExecutorImpl) executeQuery(job *AlertJob) (tsdb.TimeSeriesSlice, error) {
  83. getDsInfo := &m.GetDataSourceByIdQuery{
  84. Id: job.Rule.Query.DatasourceId,
  85. OrgId: job.Rule.OrgId,
  86. }
  87. if err := bus.Dispatch(getDsInfo); err != nil {
  88. return nil, fmt.Errorf("Could not find datasource")
  89. }
  90. req := e.GetRequestForAlertRule(job.Rule, getDsInfo.Result)
  91. result := make(tsdb.TimeSeriesSlice, 0)
  92. resp, err := tsdb.HandleRequest(req)
  93. if err != nil {
  94. return nil, fmt.Errorf("Alerting: GetSeries() tsdb.HandleRequest() error %v", err)
  95. }
  96. for _, v := range resp.Results {
  97. if v.Error != nil {
  98. return nil, fmt.Errorf("Alerting: GetSeries() tsdb.HandleRequest() response error %v", v)
  99. }
  100. result = append(result, v.Series...)
  101. }
  102. return result, nil
  103. }
  104. func (e *ExecutorImpl) GetRequestForAlertRule(rule *AlertRule, datasource *m.DataSource) *tsdb.Request {
  105. log.Debug2("GetRequest", "query", rule.Query.Query, "from", rule.Query.From, "datasourceId", datasource.Id)
  106. req := &tsdb.Request{
  107. TimeRange: tsdb.TimeRange{
  108. From: "-" + rule.Query.From,
  109. To: rule.Query.To,
  110. },
  111. Queries: []*tsdb.Query{
  112. {
  113. RefId: "A",
  114. Query: rule.Query.Query,
  115. DataSource: &tsdb.DataSourceInfo{
  116. Id: datasource.Id,
  117. Name: datasource.Name,
  118. PluginId: datasource.Type,
  119. Url: datasource.Url,
  120. },
  121. },
  122. },
  123. }
  124. return req
  125. }
  126. func (e *ExecutorImpl) evaluateRule(rule *AlertRule, series tsdb.TimeSeriesSlice) *AlertResult {
  127. e.log.Debug("Evaluating Alerting Rule", "seriesCount", len(series), "ruleName", rule.Name)
  128. for _, serie := range series {
  129. e.log.Debug("Evaluating series", "series", serie.Name)
  130. if aggregator["avg"] == nil {
  131. continue
  132. }
  133. var aggValue = aggregator["avg"](serie)
  134. var critOperartor = operators[rule.Critical.Operator]
  135. var critResult = critOperartor(aggValue, rule.Critical.Level)
  136. e.log.Debug("Alert execution Crit", "name", serie.Name, "aggValue", aggValue, "operator", rule.Critical.Operator, "level", rule.Critical.Level, "result", critResult)
  137. if critResult {
  138. return &AlertResult{
  139. State: alertstates.Critical,
  140. ActualValue: aggValue,
  141. Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name),
  142. }
  143. }
  144. var warnOperartor = operators[rule.Warning.Operator]
  145. var warnResult = warnOperartor(aggValue, rule.Warning.Level)
  146. e.log.Debug("Alert execution Warn", "name", serie.Name, "aggValue", aggValue, "operator", rule.Warning.Operator, "level", rule.Warning.Level, "result", warnResult)
  147. if warnResult {
  148. return &AlertResult{
  149. State: alertstates.Warn,
  150. Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name),
  151. ActualValue: aggValue,
  152. }
  153. }
  154. }
  155. return &AlertResult{State: alertstates.Ok, Description: "Alert is OK!"}
  156. }