Просмотр исходного кода

feat(alerting): alert rule selector

bergquist 9 лет назад
Родитель
Сommit
f05cae23d2

+ 2 - 2
pkg/cmd/grafana-server/main.go

@@ -16,7 +16,7 @@ import (
 	"github.com/grafana/grafana/pkg/login"
 	"github.com/grafana/grafana/pkg/metrics"
 	"github.com/grafana/grafana/pkg/plugins"
-	//"github.com/grafana/grafana/pkg/services/alerting"
+	"github.com/grafana/grafana/pkg/services/alerting"
 	"github.com/grafana/grafana/pkg/services/eventpublisher"
 	"github.com/grafana/grafana/pkg/services/notifications"
 	"github.com/grafana/grafana/pkg/services/search"
@@ -65,7 +65,7 @@ func main() {
 	social.NewOAuthService()
 	eventpublisher.Init()
 	plugins.Init()
-	//alerting.Init()
+	alerting.Init()
 
 	if err := notifications.Init(); err != nil {
 		log.Fatal(3, "Notification service failed to initialize", err)

+ 12 - 6
pkg/models/alerts.go

@@ -19,17 +19,23 @@ type AlertRule struct {
 	WarnOperator string `json:"warnOperator"`
 	CritOperator string `json:"critOperator"`
 	Interval     string `json:"interval"`
-	//Frequency    int64  `json:"frequency"`
-	Title       string `json:"title"`
-	Description string `json:"description"`
-	QueryRange  string `json:"queryRange"`
-	Aggregator  string `json:"aggregator"`
-	State       string `json:"state"`
+	Frequency    int64  `json:"frequency"`
+	Title        string `json:"title"`
+	Description  string `json:"description"`
+	QueryRange   string `json:"queryRange"`
+	Aggregator   string `json:"aggregator"`
+	State        string `json:"state"`
 
 	Created time.Time `json:"created"`
 	Updated time.Time `json:"updated"`
 }
 
+type HeartBeat struct {
+	ServerId string
+	Updated  time.Time
+	Created  time.Time
+}
+
 type AlertRuleChange struct {
 	Id      int64     `json:"id"`
 	OrgId   int64     `json:"-"`

+ 47 - 23
pkg/services/alerting/alerting.go

@@ -1,8 +1,11 @@
 package alerting
 
 import (
+	"math/rand"
+	"strconv"
 	"time"
 
+	//"github.com/go-xorm/xorm"
 	"github.com/grafana/grafana/pkg/log"
 	m "github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/setting"
@@ -23,19 +26,35 @@ func Init() {
 type Scheduler struct {
 	jobs     []*AlertJob
 	runQueue chan *AlertJob
+
+	serverId       string
+	serverPosition int
+	clusterSize    int
 }
 
 func NewScheduler() *Scheduler {
 	return &Scheduler{
 		jobs:     make([]*AlertJob, 0),
 		runQueue: make(chan *AlertJob, 1000),
+		serverId: strconv.Itoa(rand.Intn(1000)),
 	}
 }
 
+func (s *Scheduler) heartBeat() {
+	//write heartBeat to db.
+	//get the modulus position of active servers
+
+	log.Info("Heartbeat: Sending heartbeat from " + s.serverId)
+	s.clusterSize = 1
+	s.serverPosition = 1
+}
+
 func (s *Scheduler) Dispatch() {
 	reschedule := time.NewTicker(time.Second * 10)
 	secondTicker := time.NewTicker(time.Second)
+	ticker := time.NewTicker(time.Second * 5)
 
+	s.heartBeat()
 	s.updateJobs()
 
 	for {
@@ -44,41 +63,45 @@ func (s *Scheduler) Dispatch() {
 			s.queueJobs()
 		case <-reschedule.C:
 			s.updateJobs()
+		case <-ticker.C:
+			s.heartBeat()
 		}
 	}
 }
 
+func (s *Scheduler) getAlertRules() []m.AlertRule {
+	return []m.AlertRule{
+		{Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10},
+		{Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10},
+		{Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10},
+		{Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5},
+		{Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5},
+		{Id: 6, Title: "alert rule 6", Interval: "10s", Frequency: 1},
+	}
+}
+
 func (s *Scheduler) updateJobs() {
-	log.Info("Scheduler:updateJobs()")
+	log.Info("Scheduler: UpdateJobs()")
 
 	jobs := make([]*AlertJob, 0)
-	jobs = append(jobs, &AlertJob{
-		name:      "ID_1_Each 10s",
-		frequency: 10,
-		offset:    1,
-	})
-	jobs = append(jobs, &AlertJob{
-		name:      "ID_2_Each 10s",
-		frequency: 10,
-		offset:    2,
-	})
-	jobs = append(jobs, &AlertJob{
-		name:      "ID_3_Each 10s",
-		frequency: 10,
-		offset:    3,
-	})
-
-	jobs = append(jobs, &AlertJob{
-		name:      "ID_4_Each 5s",
-		frequency: 5,
-	})
+	rules := s.getAlertRules()
+
+	for i := s.serverPosition - 1; i < len(rules); i = i + s.clusterSize {
+		rule := rules[i]
+		jobs = append(jobs, &AlertJob{
+			name:      rule.Title,
+			frequency: rule.Frequency,
+			rule:      rule,
+			offset:    int64(len(jobs)),
+		})
+	}
+
+	log.Debug("Scheduler: Selected %d jobs", len(jobs))
 
 	s.jobs = jobs
 }
 
 func (s *Scheduler) queueJobs() {
-	log.Info("Scheduler:queueJobs()")
-
 	now := time.Now().Unix()
 
 	for _, job := range s.jobs {
@@ -104,6 +127,7 @@ type AlertJob struct {
 	frequency int64
 	offset    int64
 	delay     bool
+	rule      m.AlertRule
 }
 
 type RuleReader interface {

+ 16 - 0
pkg/services/sqlstore/migrations/alert_mig.go

@@ -5,11 +5,13 @@ import (
 )
 
 func addAlertMigrations(mg *Migrator) {
+
 	alertV1 := Table{
 		Name: "alert_rule",
 		Columns: []*Column{
 			{Name: "id", Type: DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
 			{Name: "dashboard_id", Type: DB_BigInt, Nullable: false},
+			//{Name: "datasource_id", Type: DB_BigInt, Nullable: false},
 			{Name: "panel_id", Type: DB_BigInt, Nullable: false},
 			{Name: "org_id", Type: DB_BigInt, Nullable: false},
 			{Name: "query", Type: DB_Text, Nullable: false},
@@ -19,6 +21,7 @@ func addAlertMigrations(mg *Migrator) {
 			{Name: "crit_level", Type: DB_BigInt, Nullable: false},
 			{Name: "crit_operator", Type: DB_NVarchar, Length: 10, Nullable: false},
 			{Name: "interval", Type: DB_NVarchar, Length: 255, Nullable: false},
+			{Name: "frequency", Type: DB_BigInt, Nullable: false},
 			{Name: "title", Type: DB_NVarchar, Length: 255, Nullable: false},
 			{Name: "description", Type: DB_NVarchar, Length: 255, Nullable: false},
 			{Name: "query_range", Type: DB_NVarchar, Length: 255, Nullable: false},
@@ -58,4 +61,17 @@ func addAlertMigrations(mg *Migrator) {
 	}
 
 	mg.AddMigration("create alert_state_log table v1", NewAddTableMigration(alert_state_log))
+
+	alert_heartbeat := Table{
+		Name: "alert_heartbeat",
+		Columns: []*Column{
+			{Name: "id", Type: DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
+			{Name: "server_id", Type: DB_NVarchar, Length: 50, Nullable: false},
+			{Name: "created", Type: DB_DateTime, Nullable: false},
+			{Name: "updated", Type: DB_DateTime, Nullable: false},
+		},
+	}
+
+	mg.AddMigration("create alert_heartbeat table v1", NewAddTableMigration(alert_heartbeat))
+
 }