reader.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package alerting
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/grafana/grafana/pkg/bus"
  6. "github.com/grafana/grafana/pkg/log"
  7. "github.com/grafana/grafana/pkg/metrics"
  8. m "github.com/grafana/grafana/pkg/models"
  9. )
  10. type RuleReader interface {
  11. Fetch() []*Rule
  12. }
  13. type DefaultRuleReader struct {
  14. sync.RWMutex
  15. serverID string
  16. serverPosition int
  17. clusterSize int
  18. log log.Logger
  19. }
  20. func NewRuleReader() *DefaultRuleReader {
  21. ruleReader := &DefaultRuleReader{
  22. log: log.New("alerting.ruleReader"),
  23. }
  24. go ruleReader.initReader()
  25. return ruleReader
  26. }
  27. func (arr *DefaultRuleReader) initReader() {
  28. heartbeat := time.NewTicker(time.Second * 10)
  29. for {
  30. select {
  31. case <-heartbeat.C:
  32. arr.heartbeat()
  33. }
  34. }
  35. }
  36. func (arr *DefaultRuleReader) Fetch() []*Rule {
  37. cmd := &m.GetAllAlertsQuery{}
  38. if err := bus.Dispatch(cmd); err != nil {
  39. arr.log.Error("Could not load alerts", "error", err)
  40. return []*Rule{}
  41. }
  42. res := make([]*Rule, 0)
  43. for _, ruleDef := range cmd.Result {
  44. if model, err := NewRuleFromDBAlert(ruleDef); err != nil {
  45. arr.log.Error("Could not build alert model for rule", "ruleId", ruleDef.Id, "error", err)
  46. } else {
  47. res = append(res, model)
  48. }
  49. }
  50. metrics.M_Alerting_Active_Alerts.Set(float64(len(res)))
  51. return res
  52. }
  53. func (arr *DefaultRuleReader) heartbeat() {
  54. arr.clusterSize = 1
  55. arr.serverPosition = 1
  56. }