Просмотр исходного кода

tech(alerting): change from array to map

bergquist 9 лет назад
Родитель
Сommit
7229fb7a76

+ 5 - 5
pkg/services/alerting/alert_rule_reader.go

@@ -12,11 +12,11 @@ type AlertRuleReader struct{}
 
 func (this AlertRuleReader) Fetch() []m.AlertRule {
 	return []m.AlertRule{
-		{Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10},
-		{Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10},
-		{Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10},
-		{Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5},
-		{Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5},
+		//{Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10},
+		//{Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10},
+		//{Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10},
+		//{Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5},
+		//{Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5},
 		{Id: 6, Title: "alert rule 6", Interval: "10s", Frequency: 1},
 	}
 }

+ 39 - 29
pkg/services/alerting/alerting.go

@@ -5,7 +5,7 @@ import (
 	"strconv"
 	"time"
 
-	"github.com/grafana/grafana/pkg/bus"
+	//"github.com/grafana/grafana/pkg/bus"
 	"github.com/grafana/grafana/pkg/log"
 	m "github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/setting"
@@ -22,12 +22,14 @@ func Init() {
 	scheduler := NewScheduler()
 	go scheduler.Dispatch(&AlertRuleReader{})
 	go scheduler.Executor(&DummieExecutor{})
+	go scheduler.HandleResponses()
 }
 
 type Scheduler struct {
-	jobs     []*AlertJob
-	runQueue chan *AlertJob
-	mtx      sync.RWMutex
+	jobs          map[int64]*AlertJob
+	runQueue      chan *AlertJob
+	responseQueue chan *AlertResult
+	mtx           sync.RWMutex
 
 	alertRuleFetcher RuleReader
 
@@ -38,30 +40,35 @@ type Scheduler struct {
 
 func NewScheduler() *Scheduler {
 	return &Scheduler{
-		jobs:     make([]*AlertJob, 0),
-		runQueue: make(chan *AlertJob, 1000),
-		serverId: strconv.Itoa(rand.Intn(1000)),
+		jobs:          make(map[int64]*AlertJob, 0),
+		runQueue:      make(chan *AlertJob, 1000),
+		responseQueue: make(chan *AlertResult, 1000),
+		serverId:      strconv.Itoa(rand.Intn(1000)),
 	}
 }
 
 func (this *Scheduler) heartBeat() {
-	//write heartBeat to db.
-	//get the modulus position of active servers
 
-	cmd := &m.HeartBeatCommand{ServerId: this.serverId}
+	//Lets cheat on this until we focus on clustering
 	log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
-	err := bus.Dispatch(cmd)
-
-	if err != nil {
-		log.Error(1, "Failed to send heartbeat.")
-	} else {
-		this.clusterSize = cmd.Result.ClusterSize
-		this.serverPosition = cmd.Result.UptimePosition
-	}
+	this.clusterSize = 1
+	this.serverPosition = 1
+
+	/*
+		cmd := &m.HeartBeatCommand{ServerId: this.serverId}
+		err := bus.Dispatch(cmd)
+
+		if err != nil {
+			log.Error(1, "Failed to send heartbeat.")
+		} else {
+			this.clusterSize = cmd.Result.ClusterSize
+			this.serverPosition = cmd.Result.UptimePosition
+		}
+	*/
 }
 
 func (this *Scheduler) Dispatch(reader RuleReader) {
-	reschedule := time.NewTicker(time.Second * 10)
+	reschedule := time.NewTicker(time.Second * 100)
 	secondTicker := time.NewTicker(time.Second)
 	heartbeat := time.NewTicker(time.Second * 5)
 
@@ -83,7 +90,7 @@ func (this *Scheduler) Dispatch(reader RuleReader) {
 func (this *Scheduler) updateJobs(f func() []m.AlertRule) {
 	log.Debug("Scheduler: UpdateJobs()")
 
-	jobs := make([]*AlertJob, 0)
+	jobs := make(map[int64]*AlertJob, 0)
 	rules := f()
 
 	this.mtx.Lock()
@@ -91,10 +98,7 @@ func (this *Scheduler) updateJobs(f func() []m.AlertRule) {
 
 	for i := this.serverPosition - 1; i < len(rules); i += this.clusterSize {
 		rule := rules[i]
-		jobs = append(jobs, &AlertJob{
-			rule:   rule,
-			offset: int64(len(jobs)),
-		})
+		jobs[rule.Id] = &AlertJob{rule: rule, offset: int64(len(jobs))}
 	}
 
 	log.Debug("Scheduler: Selected %d jobs", len(jobs))
@@ -117,15 +121,21 @@ func (this *Scheduler) Executor(executor Executor) {
 	for job := range this.runQueue {
 		log.Info("Executor: queue length %d", len(this.runQueue))
 		log.Info("Executor: executing %s", job.rule.Title)
-		go Measure(executor, job)
+		this.jobs[job.rule.Id].running = true
+		go this.Measure(executor, job)
+	}
+}
+
+func (this *Scheduler) HandleResponses() {
+	for response := range this.responseQueue {
+		log.Info("Response: alert %d returned %s", response.id, response.state)
+		this.jobs[response.id].running = false
 	}
 }
 
-func Measure(exec Executor, rule *AlertJob) {
+func (this *Scheduler) Measure(exec Executor, rule *AlertJob) {
 	now := time.Now()
-	rule.running = true
-	exec.Execute(rule.rule)
-	rule.running = true
+	exec.Execute(rule.rule, this.responseQueue)
 	elapsed := time.Since(now)
 	log.Info("Schedular: exeuction took %v milli seconds", elapsed.Nanoseconds()/1000000)
 }

+ 6 - 6
pkg/services/alerting/alerting_test.go

@@ -21,7 +21,7 @@ func TestAlertingScheduler(t *testing.T) {
 
 		Convey("single server", func() {
 			scheduler := &Scheduler{
-				jobs:           make([]*AlertJob, 0),
+				jobs:           make(map[int64]*AlertJob, 0),
 				runQueue:       make(chan *AlertJob, 1000),
 				serverId:       "",
 				serverPosition: 1,
@@ -34,7 +34,7 @@ func TestAlertingScheduler(t *testing.T) {
 
 		Convey("two servers", func() {
 			scheduler := &Scheduler{
-				jobs:           make([]*AlertJob, 0),
+				jobs:           make(map[int64]*AlertJob, 0),
 				runQueue:       make(chan *AlertJob, 1000),
 				serverId:       "",
 				serverPosition: 1,
@@ -43,12 +43,12 @@ func TestAlertingScheduler(t *testing.T) {
 
 			scheduler.updateJobs(mockFn)
 			So(len(scheduler.jobs), ShouldEqual, 3)
-			So(scheduler.jobs[0].rule.Id, ShouldEqual, 1)
+			So(scheduler.jobs[1].rule.Id, ShouldEqual, 1)
 		})
 
 		Convey("six servers", func() {
 			scheduler := &Scheduler{
-				jobs:           make([]*AlertJob, 0),
+				jobs:           make(map[int64]*AlertJob, 0),
 				runQueue:       make(chan *AlertJob, 1000),
 				serverId:       "",
 				serverPosition: 6,
@@ -57,7 +57,7 @@ func TestAlertingScheduler(t *testing.T) {
 
 			scheduler.updateJobs(mockFn)
 			So(len(scheduler.jobs), ShouldEqual, 1)
-			So(scheduler.jobs[0].rule.Id, ShouldEqual, 6)
+			So(scheduler.jobs[6].rule.Id, ShouldEqual, 6)
 		})
 
 		Convey("more servers then alerts", func() {
@@ -68,7 +68,7 @@ func TestAlertingScheduler(t *testing.T) {
 			}
 
 			scheduler := &Scheduler{
-				jobs:           make([]*AlertJob, 0),
+				jobs:           make(map[int64]*AlertJob, 0),
 				runQueue:       make(chan *AlertJob, 1000),
 				serverId:       "",
 				serverPosition: 3,

+ 7 - 6
pkg/services/alerting/executor.go

@@ -7,16 +7,17 @@ import (
 )
 
 type Executor interface {
-	Execute(rule m.AlertRule) (err error, result AlertResult)
+	Execute(rule m.AlertRule, responseQueue chan *AlertResult)
 }
 
 type DummieExecutor struct{}
 
-func (this DummieExecutor) Execute(rule m.AlertRule) (err error, result AlertResult) {
-	if rule.Id == 6 {
-		time.Sleep(time.Second * 60)
-	}
+func (this DummieExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) {
+	//if rule.Id == 6 {
+	//	time.Sleep(time.Second * 60)
+	//}
 	time.Sleep(time.Second)
 	log.Info("Finnished executing: %d", rule.Id)
-	return nil, AlertResult{state: "OK", id: rule.Id}
+	responseQueue <- &AlertResult{state: "OK", id: rule.Id}
+	//return nil,
 }

+ 18 - 0
pkg/services/sqlstore/alert_heartbeat_test.go

@@ -0,0 +1,18 @@
+package sqlstore
+
+import (
+	"testing"
+
+	//	m "github.com/grafana/grafana/pkg/models"
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+func TestAlertingHeartbeatDataAccess(t *testing.T) {
+
+	Convey("Testing Alerting data access", t, func() {
+		InitTestDB(t)
+		//send heartbeat from server 1
+		//send heartbeat from server 2
+
+	})
+}