reader.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package alerting
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/grafana/grafana/pkg/bus"
  6. "github.com/grafana/grafana/pkg/log"
  7. "github.com/grafana/grafana/pkg/metrics"
  8. m "github.com/grafana/grafana/pkg/models"
  9. )
  10. type RuleReader interface {
  11. Fetch() []*Rule
  12. }
  13. type DefaultRuleReader struct {
  14. sync.RWMutex
  15. //serverID string
  16. serverPosition int
  17. clusterSize int
  18. log log.Logger
  19. }
  20. func NewRuleReader() *DefaultRuleReader {
  21. ruleReader := &DefaultRuleReader{
  22. log: log.New("alerting.ruleReader"),
  23. }
  24. go ruleReader.initReader()
  25. return ruleReader
  26. }
  27. func (arr *DefaultRuleReader) initReader() {
  28. heartbeat := time.NewTicker(time.Second * 10)
  29. for range heartbeat.C {
  30. arr.heartbeat()
  31. }
  32. }
  33. func (arr *DefaultRuleReader) Fetch() []*Rule {
  34. cmd := &m.GetAllAlertsQuery{}
  35. if err := bus.Dispatch(cmd); err != nil {
  36. arr.log.Error("Could not load alerts", "error", err)
  37. return []*Rule{}
  38. }
  39. res := make([]*Rule, 0)
  40. for _, ruleDef := range cmd.Result {
  41. if model, err := NewRuleFromDBAlert(ruleDef); err != nil {
  42. arr.log.Error("Could not build alert model for rule", "ruleId", ruleDef.Id, "error", err)
  43. } else {
  44. res = append(res, model)
  45. }
  46. }
  47. metrics.M_Alerting_Active_Alerts.Set(float64(len(res)))
  48. return res
  49. }
  50. func (arr *DefaultRuleReader) heartbeat() {
  51. arr.clusterSize = 1
  52. arr.serverPosition = 1
  53. }