executor.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package alerting
  2. import (
  3. "fmt"
  4. m "github.com/grafana/grafana/pkg/models"
  5. "github.com/grafana/grafana/pkg/services/alerting/graphite"
  6. "math"
  7. )
  8. type Executor interface {
  9. Execute(rule *m.AlertJob, responseQueue chan *m.AlertResult)
  10. }
  11. type ExecutorImpl struct{}
  12. type compareFn func(float64, float64) bool
  13. type aggregationFn func(*m.TimeSeries) float64
  14. var operators map[string]compareFn = map[string]compareFn{
  15. ">": func(num1, num2 float64) bool { return num1 > num2 },
  16. ">=": func(num1, num2 float64) bool { return num1 >= num2 },
  17. "<": func(num1, num2 float64) bool { return num1 < num2 },
  18. "<=": func(num1, num2 float64) bool { return num1 <= num2 },
  19. "": func(num1, num2 float64) bool { return false },
  20. }
  21. var aggregator map[string]aggregationFn = map[string]aggregationFn{
  22. "avg": func(series *m.TimeSeries) float64 {
  23. sum := float64(0)
  24. for _, v := range series.Points {
  25. sum += v[0]
  26. }
  27. return sum / float64(len(series.Points))
  28. },
  29. "sum": func(series *m.TimeSeries) float64 {
  30. sum := float64(0)
  31. for _, v := range series.Points {
  32. sum += v[0]
  33. }
  34. return sum
  35. },
  36. "min": func(series *m.TimeSeries) float64 {
  37. min := series.Points[0][0]
  38. for _, v := range series.Points {
  39. if v[0] < min {
  40. min = v[0]
  41. }
  42. }
  43. return min
  44. },
  45. "max": func(series *m.TimeSeries) float64 {
  46. max := series.Points[0][0]
  47. for _, v := range series.Points {
  48. if v[0] > max {
  49. max = v[0]
  50. }
  51. }
  52. return max
  53. },
  54. "mean": func(series *m.TimeSeries) float64 {
  55. midPosition := int64(math.Floor(float64(len(series.Points)) / float64(2)))
  56. return series.Points[midPosition][0]
  57. },
  58. }
  59. func (this *ExecutorImpl) GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) {
  60. if job.Datasource.Type == m.DS_GRAPHITE {
  61. return graphite.GraphiteClient{}.GetSeries(job)
  62. }
  63. return nil, fmt.Errorf("Grafana does not support alerts for %s", job.Datasource.Type)
  64. }
  65. func (this *ExecutorImpl) Execute(job *m.AlertJob, responseQueue chan *m.AlertResult) {
  66. response, err := this.GetSeries(job)
  67. if err != nil {
  68. responseQueue <- &m.AlertResult{State: "PENDING", Id: job.Rule.Id, Rule: job.Rule}
  69. }
  70. responseQueue <- this.ValidateRule(job.Rule, response)
  71. }
  72. func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlice) *m.AlertResult {
  73. for _, serie := range series {
  74. if aggregator[rule.Aggregator] == nil {
  75. continue
  76. }
  77. var aggValue = aggregator[rule.Aggregator](serie)
  78. if operators[rule.CritOperator](aggValue, rule.CritLevel) {
  79. return &m.AlertResult{
  80. State: m.AlertStateCritical,
  81. Id: rule.Id,
  82. ActualValue: aggValue,
  83. Description: fmt.Sprintf("Actual value: %1.2f for %s", aggValue, serie.Name),
  84. Rule: rule,
  85. }
  86. }
  87. if operators[rule.WarnOperator](aggValue, rule.WarnLevel) {
  88. return &m.AlertResult{
  89. State: m.AlertStateWarn,
  90. Id: rule.Id,
  91. Description: fmt.Sprintf("Actual value: %1.2f for %s", aggValue, serie.Name),
  92. ActualValue: aggValue,
  93. Rule: rule,
  94. }
  95. }
  96. }
  97. return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id, Rule: rule, Description: "Alert is OK!"}
  98. }