reader.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package alerting
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/grafana/grafana/pkg/bus"
  6. "github.com/grafana/grafana/pkg/infra/log"
  7. "github.com/grafana/grafana/pkg/infra/metrics"
  8. "github.com/grafana/grafana/pkg/models"
  9. )
  10. type RuleReader interface {
  11. Fetch() []*Rule
  12. }
  13. type DefaultRuleReader struct {
  14. sync.RWMutex
  15. serverPosition int
  16. clusterSize int
  17. log log.Logger
  18. }
  19. func NewRuleReader() *DefaultRuleReader {
  20. ruleReader := &DefaultRuleReader{
  21. log: log.New("alerting.ruleReader"),
  22. }
  23. go ruleReader.initReader()
  24. return ruleReader
  25. }
  26. func (arr *DefaultRuleReader) initReader() {
  27. heartbeat := time.NewTicker(time.Second * 10)
  28. for range heartbeat.C {
  29. arr.heartbeat()
  30. }
  31. }
  32. func (arr *DefaultRuleReader) Fetch() []*Rule {
  33. cmd := &models.GetAllAlertsQuery{}
  34. if err := bus.Dispatch(cmd); err != nil {
  35. arr.log.Error("Could not load alerts", "error", err)
  36. return []*Rule{}
  37. }
  38. res := make([]*Rule, 0)
  39. for _, ruleDef := range cmd.Result {
  40. if model, err := NewRuleFromDBAlert(ruleDef); err != nil {
  41. arr.log.Error("Could not build alert model for rule", "ruleId", ruleDef.Id, "error", err)
  42. } else {
  43. res = append(res, model)
  44. }
  45. }
  46. metrics.M_Alerting_Active_Alerts.Set(float64(len(res)))
  47. return res
  48. }
  49. func (arr *DefaultRuleReader) heartbeat() {
  50. arr.clusterSize = 1
  51. arr.serverPosition = 1
  52. }