alert_rule_reader.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package alerting
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/grafana/grafana/pkg/bus"
  6. "github.com/grafana/grafana/pkg/log"
  7. m "github.com/grafana/grafana/pkg/models"
  8. )
  9. type RuleReader interface {
  10. Fetch() []m.AlertRule
  11. }
  12. type AlertRuleReader struct {
  13. sync.RWMutex
  14. serverID string
  15. serverPosition int
  16. clusterSize int
  17. }
  18. func NewRuleReader() *AlertRuleReader {
  19. rrr := &AlertRuleReader{}
  20. go rrr.initReader()
  21. return rrr
  22. }
  23. var (
  24. alertJobs []m.AlertRule
  25. )
  26. func (arr *AlertRuleReader) Fetch() []m.AlertRule {
  27. return alertJobs
  28. }
  29. func (arr *AlertRuleReader) initReader() {
  30. alertJobs = make([]m.AlertRule, 0)
  31. heartbeat := time.NewTicker(time.Second * 10)
  32. arr.updateRules()
  33. for {
  34. select {
  35. case <-heartbeat.C:
  36. arr.updateRules()
  37. }
  38. }
  39. }
  40. func (arr *AlertRuleReader) updateRules() {
  41. arr.Lock()
  42. defer arr.Unlock()
  43. cmd := &m.GetAlertsQuery{
  44. OrgId: 1,
  45. }
  46. err := bus.Dispatch(cmd)
  47. if err == nil {
  48. alertJobs = cmd.Result
  49. } else {
  50. log.Error(1, "AlertRuleReader: Could not load alerts")
  51. }
  52. }
  53. func (arr *AlertRuleReader) heartBeat() {
  54. //Lets cheat on this until we focus on clustering
  55. //log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
  56. arr.clusterSize = 1
  57. arr.serverPosition = 1
  58. /*
  59. cmd := &m.HeartBeatCommand{ServerId: this.serverId}
  60. err := bus.Dispatch(cmd)
  61. if err != nil {
  62. log.Error(1, "Failed to send heartbeat.")
  63. } else {
  64. this.clusterSize = cmd.Result.ClusterSize
  65. this.serverPosition = cmd.Result.UptimePosition
  66. }
  67. */
  68. }