executor.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package alerting
  2. import (
  3. "fmt"
  4. m "github.com/grafana/grafana/pkg/models"
  5. "github.com/grafana/grafana/pkg/services/alerting/graphite"
  6. )
  7. type Executor interface {
  8. Execute(rule *m.AlertJob, responseQueue chan *m.AlertResult)
  9. }
  10. type ExecutorImpl struct{}
  11. type compareFn func(float64, float64) bool
  12. type aggregationFn func(*m.TimeSeries) float64
  13. var operators map[string]compareFn = map[string]compareFn{
  14. ">": func(num1, num2 float64) bool { return num1 > num2 },
  15. ">=": func(num1, num2 float64) bool { return num1 >= num2 },
  16. "<": func(num1, num2 float64) bool { return num1 < num2 },
  17. "<=": func(num1, num2 float64) bool { return num1 <= num2 },
  18. "": func(num1, num2 float64) bool { return false },
  19. }
  20. var aggregator map[string]aggregationFn = map[string]aggregationFn{
  21. "avg": func(series *m.TimeSeries) float64 { return series.Avg },
  22. "sum": func(series *m.TimeSeries) float64 { return series.Sum },
  23. "min": func(series *m.TimeSeries) float64 { return series.Min },
  24. "max": func(series *m.TimeSeries) float64 { return series.Max },
  25. "mean": func(series *m.TimeSeries) float64 { return series.Mean },
  26. }
  27. func (this *ExecutorImpl) GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) {
  28. if job.Datasource.Type == m.DS_GRAPHITE {
  29. return graphite.GraphiteClient{}.GetSeries(job)
  30. }
  31. return nil, fmt.Errorf("Grafana does not support alerts for %s", job.Datasource.Type)
  32. }
  33. func (this *ExecutorImpl) Execute(job *m.AlertJob, responseQueue chan *m.AlertResult) {
  34. response, err := this.GetSeries(job)
  35. if err != nil {
  36. responseQueue <- &m.AlertResult{State: "PENDING", Id: job.Rule.Id, Rule: job.Rule}
  37. }
  38. responseQueue <- this.ValidateRule(job.Rule, response)
  39. }
  40. func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlice) *m.AlertResult {
  41. for _, serie := range series {
  42. if aggregator[rule.Aggregator] == nil {
  43. continue
  44. }
  45. var aggValue = aggregator[rule.Aggregator](serie)
  46. if operators[rule.CritOperator](aggValue, rule.CritLevel) {
  47. return &m.AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: aggValue, Rule: rule}
  48. }
  49. if operators[rule.WarnOperator](aggValue, rule.WarnLevel) {
  50. return &m.AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: aggValue, Rule: rule}
  51. }
  52. }
  53. return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id, Rule: rule}
  54. }