graphite_executor.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package alerting
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/franela/goreq"
  6. "github.com/grafana/grafana/pkg/bus"
  7. "github.com/grafana/grafana/pkg/components/simplejson"
  8. m "github.com/grafana/grafana/pkg/models"
  9. "net/http"
  10. "net/url"
  11. "time"
  12. )
  13. type GraphiteExecutor struct{}
  14. type Series struct {
  15. Datapoints []DataPoint
  16. Target string
  17. }
  18. type Response []Series
  19. type DataPoint []json.Number
  20. func (this *GraphiteExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) {
  21. response, err := this.getSeries(rule)
  22. if err != nil {
  23. responseQueue <- &AlertResult{State: "CRITICAL", Id: rule.Id}
  24. }
  25. responseQueue <- this.executeRules(response, rule)
  26. }
  27. func (this *GraphiteExecutor) executeRules(series []Series, rule m.AlertRule) *AlertResult {
  28. for _, v := range series {
  29. var avg float64
  30. var sum float64
  31. for _, dp := range v.Datapoints {
  32. i, _ := dp[0].Float64()
  33. sum += i
  34. }
  35. avg = sum / float64(len(v.Datapoints))
  36. if float64(rule.CritLevel) < avg {
  37. return &AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: avg}
  38. }
  39. if float64(rule.WarnLevel) < avg {
  40. return &AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: avg}
  41. }
  42. if float64(rule.CritLevel) < sum {
  43. return &AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: sum}
  44. }
  45. if float64(rule.WarnLevel) < sum {
  46. return &AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: sum}
  47. }
  48. }
  49. return &AlertResult{State: m.AlertStateOk, Id: rule.Id}
  50. }
  51. func (this *GraphiteExecutor) getSeries(rule m.AlertRule) (Response, error) {
  52. query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId}
  53. if err := bus.Dispatch(query); err != nil {
  54. return nil, err
  55. }
  56. v := url.Values{
  57. "format": []string{"json"},
  58. "target": []string{getTargetFromQuery(rule)},
  59. }
  60. v.Add("from", "-"+rule.QueryRange)
  61. v.Add("until", "now")
  62. req := goreq.Request{
  63. Method: "POST",
  64. Uri: query.Result.Url + "/render",
  65. Body: v.Encode(),
  66. Timeout: 500 * time.Millisecond,
  67. }
  68. res, err := req.Do()
  69. response := Response{}
  70. res.Body.FromJsonTo(&response)
  71. if err != nil {
  72. return nil, err
  73. }
  74. if res.StatusCode != http.StatusOK {
  75. return nil, fmt.Errorf("error!")
  76. }
  77. return response, nil
  78. }
  79. func getTargetFromQuery(rule m.AlertRule) string {
  80. json, _ := simplejson.NewJson([]byte(rule.Query))
  81. return json.Get("target").MustString()
  82. }