Sfoglia il codice sorgente

feat(alerting): extracts alert rule reading

bergquist 9 anni fa
parent
commit
1498db11a9

+ 8 - 0
pkg/models/alerts.go

@@ -108,3 +108,11 @@ type GetAlertChangesQuery struct {
 
 	Result []AlertRuleChange
 }
+
+type AlertJob struct {
+	Offset     int64
+	Delay      bool
+	Running    bool
+	Rule       AlertRule
+	Datasource DataSource
+}

+ 86 - 9
pkg/services/alerting/alert_rule_reader.go

@@ -3,17 +3,53 @@ package alerting
 import (
 	"github.com/grafana/grafana/pkg/bus"
 	m "github.com/grafana/grafana/pkg/models"
+	"sync"
+	"time"
 )
 
 type RuleReader interface {
-	Fetch() []m.AlertRule
+	Fetch() []m.AlertJob
 }
 
-type AlertRuleReader struct{}
+type AlertRuleReader struct {
+	serverId       string
+	serverPosition int
+	clusterSize    int
+	mtx            sync.RWMutex
+}
+
+func NewRuleReader() *AlertRuleReader {
+	rrr := &AlertRuleReader{}
+
+	go rrr.initReader()
+	return rrr
+}
+
+var (
+	alertJobs []m.AlertJob
+)
+
+func (this *AlertRuleReader) initReader() {
+	alertJobs = make([]m.AlertJob, 0)
+	heartbeat := time.NewTicker(time.Second * 5)
+	this.rr()
+
+	for {
+		select {
+		case <-heartbeat.C:
+			this.rr()
+		}
+	}
+}
+
+func (this *AlertRuleReader) rr() {
+	this.mtx.Lock()
+	defer this.mtx.Unlock()
+
+	rules := make([]m.AlertRule, 0)
 
-func (this AlertRuleReader) Fetch() []m.AlertRule {
 	/*
-		return []m.AlertRule{
+		rules = []m.AlertRule{
 			//{Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10},
 			//{Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10},
 			//{Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10},
@@ -25,12 +61,13 @@ func (this AlertRuleReader) Fetch() []m.AlertRule {
 				Title:        "alert rule 1",
 				Frequency:    3,
 				DatasourceId: 1,
-				WarnOperator: "<",
+				WarnOperator: ">",
 				WarnLevel:    3,
-				CritOperator: "<",
+				CritOperator: ">",
 				CritLevel:    4,
 				Aggregator:   "avg",
-				Query:        `{"refId":"A","target":"statsd.fakesite.counters.session_start.*.count","textEditor":true}"`,
+				//Query:        `{"refId":"A","target":"statsd.fakesite.counters.session_start.*.count","textEditor":true}"`,
+				Query:        `{"hide":false,"refId":"A","target":"aliasByNode(statsd.fakesite.counters.session_start.*.count, 4)","textEditor":false}`,
 				QueryRange:   3600,
 			},
 		}
@@ -39,8 +76,48 @@ func (this AlertRuleReader) Fetch() []m.AlertRule {
 	cmd := &m.GetAlertsQuery{
 		OrgId: 1,
 	}
-
 	bus.Dispatch(cmd)
+	rules = cmd.Result
+	//for i := this.serverPosition - 1; i < len(rules); i += this.clusterSize {
+
+	jobs := make([]m.AlertJob, 0)
+	for _, rule := range rules {
+		query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId}
+		err := bus.Dispatch(query)
 
-	return cmd.Result
+		if err != nil {
+			continue
+		}
+
+		jobs = append(jobs, m.AlertJob{
+			Rule:       rule,
+			Datasource: query.Result,
+		})
+	}
+
+	alertJobs = jobs
+}
+
+func (this *AlertRuleReader) Fetch() []m.AlertJob {
+	return alertJobs
+}
+
+func (this *AlertRuleReader) heartBeat() {
+
+	//Lets cheat on this until we focus on clustering
+	//log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
+	this.clusterSize = 1
+	this.serverPosition = 1
+
+	/*
+		cmd := &m.HeartBeatCommand{ServerId: this.serverId}
+		err := bus.Dispatch(cmd)
+
+		if err != nil {
+			log.Error(1, "Failed to send heartbeat.")
+		} else {
+			this.clusterSize = cmd.Result.ClusterSize
+			this.serverPosition = cmd.Result.UptimePosition
+		}
+	*/
 }

+ 23 - 62
pkg/services/alerting/alerting.go

@@ -1,16 +1,12 @@
 package alerting
 
 import (
-	"math/rand"
-	"strconv"
 	"time"
 
-	//"github.com/grafana/grafana/pkg/bus"
 	"github.com/grafana/grafana/pkg/bus"
 	"github.com/grafana/grafana/pkg/log"
 	m "github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/setting"
-	"sync"
 )
 
 func Init() {
@@ -21,59 +17,34 @@ func Init() {
 	log.Info("Alerting: Initializing scheduler...")
 
 	scheduler := NewScheduler()
-	go scheduler.Dispatch(&AlertRuleReader{})
+	reader := NewRuleReader()
+
+	go scheduler.Dispatch(reader)
 	go scheduler.Executor(&ExecutorImpl{})
 	go scheduler.HandleResponses()
+
 }
 
 type Scheduler struct {
-	jobs          map[int64]*AlertJob
-	runQueue      chan *AlertJob
+	jobs          map[int64]*m.AlertJob
+	runQueue      chan *m.AlertJob
 	responseQueue chan *AlertResult
-	mtx           sync.RWMutex
 
 	alertRuleFetcher RuleReader
-
-	serverId       string
-	serverPosition int
-	clusterSize    int
 }
 
 func NewScheduler() *Scheduler {
 	return &Scheduler{
-		jobs:          make(map[int64]*AlertJob, 0),
-		runQueue:      make(chan *AlertJob, 1000),
+		jobs:          make(map[int64]*m.AlertJob, 0),
+		runQueue:      make(chan *m.AlertJob, 1000),
 		responseQueue: make(chan *AlertResult, 1000),
-		serverId:      strconv.Itoa(rand.Intn(1000)),
 	}
 }
 
-func (this *Scheduler) heartBeat() {
-
-	//Lets cheat on this until we focus on clustering
-	//log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
-	this.clusterSize = 1
-	this.serverPosition = 1
-
-	/*
-		cmd := &m.HeartBeatCommand{ServerId: this.serverId}
-		err := bus.Dispatch(cmd)
-
-		if err != nil {
-			log.Error(1, "Failed to send heartbeat.")
-		} else {
-			this.clusterSize = cmd.Result.ClusterSize
-			this.serverPosition = cmd.Result.UptimePosition
-		}
-	*/
-}
-
 func (this *Scheduler) Dispatch(reader RuleReader) {
-	reschedule := time.NewTicker(time.Second * 100)
+	reschedule := time.NewTicker(time.Second * 5)
 	secondTicker := time.NewTicker(time.Second)
-	heartbeat := time.NewTicker(time.Second * 5)
 
-	this.heartBeat()
 	this.updateJobs(reader.Fetch)
 
 	for {
@@ -82,24 +53,20 @@ func (this *Scheduler) Dispatch(reader RuleReader) {
 			this.queueJobs()
 		case <-reschedule.C:
 			this.updateJobs(reader.Fetch)
-		case <-heartbeat.C:
-			this.heartBeat()
 		}
 	}
 }
 
-func (this *Scheduler) updateJobs(f func() []m.AlertRule) {
+func (this *Scheduler) updateJobs(f func() []m.AlertJob) {
 	log.Debug("Scheduler: UpdateJobs()")
 
-	jobs := make(map[int64]*AlertJob, 0)
+	jobs := make(map[int64]*m.AlertJob, 0)
 	rules := f()
 
-	this.mtx.Lock()
-	defer this.mtx.Unlock()
-
-	for i := this.serverPosition - 1; i < len(rules); i += this.clusterSize {
+	for i := 0; i < len(rules); i++ {
 		rule := rules[i]
-		jobs[rule.Id] = &AlertJob{rule: rule, offset: int64(len(jobs))}
+		//jobs[rule.Rule.Id] = &m.AlertJob{Rule: rule, Offset: int64(len(jobs))}
+		jobs[rule.Rule.Id] = &rule
 	}
 
 	log.Debug("Scheduler: Selected %d jobs", len(jobs))
@@ -111,8 +78,8 @@ func (this *Scheduler) queueJobs() {
 	now := time.Now().Unix()
 
 	for _, job := range this.jobs {
-		if now%job.rule.Frequency == 0 && job.running == false {
-			log.Info("Scheduler: Putting job on to run queue: %s", job.rule.Title)
+		if now%job.Rule.Frequency == 0 && job.Running == false {
+			log.Info("Scheduler: Putting job on to run queue: %s", job.Rule.Title)
 			this.runQueue <- job
 		}
 	}
@@ -121,8 +88,8 @@ func (this *Scheduler) queueJobs() {
 func (this *Scheduler) Executor(executor Executor) {
 	for job := range this.runQueue {
 		//log.Info("Executor: queue length %d", len(this.runQueue))
-		log.Info("Executor: executing %s", job.rule.Title)
-		this.jobs[job.rule.Id].running = true
+		log.Info("Executor: executing %s", job.Rule.Title)
+		this.jobs[job.Rule.Id].Running = true
 		this.MeasureAndExecute(executor, job)
 	}
 }
@@ -131,8 +98,9 @@ func (this *Scheduler) HandleResponses() {
 	for response := range this.responseQueue {
 		log.Info("Response: alert(%d) status(%s) actual(%v)", response.Id, response.State, response.ActualValue)
 		if this.jobs[response.Id] != nil {
-			this.jobs[response.Id].running = false
+			this.jobs[response.Id].Running = false
 		}
+
 		cmd := m.UpdateAlertStateCommand{
 			AlertId:  response.Id,
 			NewState: response.State,
@@ -144,15 +112,15 @@ func (this *Scheduler) HandleResponses() {
 	}
 }
 
-func (this *Scheduler) MeasureAndExecute(exec Executor, rule *AlertJob) {
+func (this *Scheduler) MeasureAndExecute(exec Executor, rule *m.AlertJob) {
 	now := time.Now()
 
 	response := make(chan *AlertResult, 1)
-	go exec.Execute(rule.rule, response)
+	go exec.Execute(rule, response)
 
 	select {
 	case <-time.After(time.Second * 5):
-		this.responseQueue <- &AlertResult{Id: rule.rule.Id, State: "timed out", Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000)}
+		this.responseQueue <- &AlertResult{Id: rule.Rule.Id, State: "timed out", Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000)}
 	case r := <-response:
 		r.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
 		log.Info("Schedular: exeuction took %vms", r.Duration)
@@ -160,13 +128,6 @@ func (this *Scheduler) MeasureAndExecute(exec Executor, rule *AlertJob) {
 	}
 }
 
-type AlertJob struct {
-	offset  int64
-	delay   bool
-	running bool
-	rule    m.AlertRule
-}
-
 type AlertResult struct {
 	Id          int64
 	State       string

+ 62 - 60
pkg/services/alerting/alerting_test.go

@@ -1,82 +1,84 @@
 package alerting
 
 import (
-	m "github.com/grafana/grafana/pkg/models"
+	//m "github.com/grafana/grafana/pkg/models"
 	. "github.com/smartystreets/goconvey/convey"
 	"testing"
 )
 
 func TestAlertingScheduler(t *testing.T) {
 	Convey("Testing alert job selection", t, func() {
-		mockFn := func() []m.AlertRule {
-			return []m.AlertRule{
-				{Id: 1, Title: "test 1"},
-				{Id: 2, Title: "test 2"},
-				{Id: 3, Title: "test 3"},
-				{Id: 4, Title: "test 4"},
-				{Id: 5, Title: "test 5"},
-				{Id: 6, Title: "test 6"},
+		/*
+			mockFn := func() []m.AlertRule {
+				return []m.AlertRule{
+					{Id: 1, Title: "test 1"},
+					{Id: 2, Title: "test 2"},
+					{Id: 3, Title: "test 3"},
+					{Id: 4, Title: "test 4"},
+					{Id: 5, Title: "test 5"},
+					{Id: 6, Title: "test 6"},
+				}
 			}
-		}
 
-		Convey("single server", func() {
-			scheduler := &Scheduler{
-				jobs:           make(map[int64]*AlertJob, 0),
-				runQueue:       make(chan *AlertJob, 1000),
-				serverId:       "",
-				serverPosition: 1,
-				clusterSize:    1,
-			}
+			Convey("single server", func() {
+				scheduler := &Scheduler{
+					jobs:           make(map[int64]*AlertJob, 0),
+					runQueue:       make(chan *AlertJob, 1000),
+					serverId:       "",
+					serverPosition: 1,
+					clusterSize:    1,
+				}
 
-			scheduler.updateJobs(mockFn)
-			So(len(scheduler.jobs), ShouldEqual, 6)
-		})
+				scheduler.updateJobs(mockFn)
+				So(len(scheduler.jobs), ShouldEqual, 6)
+			})
 
-		Convey("two servers", func() {
-			scheduler := &Scheduler{
-				jobs:           make(map[int64]*AlertJob, 0),
-				runQueue:       make(chan *AlertJob, 1000),
-				serverId:       "",
-				serverPosition: 1,
-				clusterSize:    2,
-			}
+			Convey("two servers", func() {
+				scheduler := &Scheduler{
+					jobs:           make(map[int64]*AlertJob, 0),
+					runQueue:       make(chan *AlertJob, 1000),
+					serverId:       "",
+					serverPosition: 1,
+					clusterSize:    2,
+				}
 
-			scheduler.updateJobs(mockFn)
-			So(len(scheduler.jobs), ShouldEqual, 3)
-			So(scheduler.jobs[1].rule.Id, ShouldEqual, 1)
-		})
+				scheduler.updateJobs(mockFn)
+				So(len(scheduler.jobs), ShouldEqual, 3)
+				So(scheduler.jobs[1].rule.Id, ShouldEqual, 1)
+			})
 
-		Convey("six servers", func() {
-			scheduler := &Scheduler{
-				jobs:           make(map[int64]*AlertJob, 0),
-				runQueue:       make(chan *AlertJob, 1000),
-				serverId:       "",
-				serverPosition: 6,
-				clusterSize:    6,
-			}
+			Convey("six servers", func() {
+				scheduler := &Scheduler{
+					jobs:           make(map[int64]*AlertJob, 0),
+					runQueue:       make(chan *AlertJob, 1000),
+					serverId:       "",
+					serverPosition: 6,
+					clusterSize:    6,
+				}
 
-			scheduler.updateJobs(mockFn)
-			So(len(scheduler.jobs), ShouldEqual, 1)
-			So(scheduler.jobs[6].rule.Id, ShouldEqual, 6)
-		})
+				scheduler.updateJobs(mockFn)
+				So(len(scheduler.jobs), ShouldEqual, 1)
+				So(scheduler.jobs[6].rule.Id, ShouldEqual, 6)
+			})
 
-		Convey("more servers then alerts", func() {
-			mockFn := func() []m.AlertRule {
-				return []m.AlertRule{
-					{Id: 1, Title: "test 1"},
+			Convey("more servers then alerts", func() {
+				mockFn := func() []m.AlertRule {
+					return []m.AlertRule{
+						{Id: 1, Title: "test 1"},
+					}
 				}
-			}
 
-			scheduler := &Scheduler{
-				jobs:           make(map[int64]*AlertJob, 0),
-				runQueue:       make(chan *AlertJob, 1000),
-				serverId:       "",
-				serverPosition: 3,
-				clusterSize:    3,
-			}
+				scheduler := &Scheduler{
+					jobs:           make(map[int64]*AlertJob, 0),
+					runQueue:       make(chan *AlertJob, 1000),
+					serverId:       "",
+					serverPosition: 3,
+					clusterSize:    3,
+				}
 
-			scheduler.updateJobs(mockFn)
-			So(len(scheduler.jobs), ShouldEqual, 0)
-		})
+				scheduler.updateJobs(mockFn)
+				So(len(scheduler.jobs), ShouldEqual, 0)
+			})
+		*/
 	})
 }

+ 1 - 1
pkg/services/alerting/dashboard_parser.go

@@ -26,7 +26,7 @@ func ParseAlertsFromDashboard(cmd *m.SaveDashboardCommand) []m.AlertRule {
 				CritLevel:    alerting.Get("critLevel").MustInt64(),
 				WarnOperator: alerting.Get("warnOperator").MustString(),
 				CritOperator: alerting.Get("critOperator").MustString(),
-				Frequency:    alerting.Get("interval").MustInt64(),
+				Frequency:    alerting.Get("frequency").MustInt64(),
 				Title:        alerting.Get("title").MustString(),
 				Description:  alerting.Get("description").MustString(),
 				QueryRange:   alerting.Get("queryRange").MustInt(),

+ 4 - 4
pkg/services/alerting/executor.go

@@ -6,7 +6,7 @@ import (
 )
 
 type Executor interface {
-	Execute(rule m.AlertRule, responseQueue chan *AlertResult)
+	Execute(rule *m.AlertJob, responseQueue chan *AlertResult)
 }
 
 type ExecutorImpl struct{}
@@ -30,14 +30,14 @@ var aggregator map[string]aggregationFn = map[string]aggregationFn{
 	"mean": func(series *m.TimeSeries) float64 { return series.Mean },
 }
 
-func (this *ExecutorImpl) Execute(rule m.AlertRule, responseQueue chan *AlertResult) {
+func (this *ExecutorImpl) Execute(rule *m.AlertJob, responseQueue chan *AlertResult) {
 	response, err := graphite.GraphiteClient{}.GetSeries(rule)
 
 	if err != nil {
-		responseQueue <- &AlertResult{State: "PENDING", Id: rule.Id}
+		responseQueue <- &AlertResult{State: "PENDING", Id: rule.Rule.Id}
 	}
 
-	responseQueue <- this.ValidateRule(rule, response)
+	responseQueue <- this.ValidateRule(rule.Rule, response)
 }
 
 func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlice) *AlertResult {

+ 8 - 11
pkg/services/alerting/graphite/graphite.go

@@ -3,7 +3,7 @@ package graphite
 import (
 	"fmt"
 	"github.com/franela/goreq"
-	"github.com/grafana/grafana/pkg/bus"
+	"github.com/grafana/grafana/pkg/cmd/grafana-cli/log"
 	"github.com/grafana/grafana/pkg/components/simplejson"
 	m "github.com/grafana/grafana/pkg/models"
 	"net/http"
@@ -21,24 +21,21 @@ type GraphiteSerie struct {
 
 type GraphiteResponse []GraphiteSerie
 
-func (this GraphiteClient) GetSeries(rule m.AlertRule) (m.TimeSeriesSlice, error) {
-	query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId}
-	if err := bus.Dispatch(query); err != nil {
-		return nil, err
-	}
-
+func (this GraphiteClient) GetSeries(rule *m.AlertJob) (m.TimeSeriesSlice, error) {
 	v := url.Values{
 		"format": []string{"json"},
-		"target": []string{getTargetFromRule(rule)},
+		"target": []string{getTargetFromRule(rule.Rule)},
 		"until":  []string{"now"},
-		"from":   []string{"-" + strconv.Itoa(rule.QueryRange) + "s"},
+		"from":   []string{"-" + strconv.Itoa(rule.Rule.QueryRange) + "s"},
 	}
 
+	log.Debug("Graphite: sending request with querystring: ", v.Encode())
+
 	res, err := goreq.Request{
 		Method:  "POST",
-		Uri:     query.Result.Url + "/render",
+		Uri:     rule.Datasource.Url + "/render",
 		Body:    v.Encode(),
-		Timeout: 500 * time.Millisecond,
+		Timeout: 5 * time.Second,
 	}.Do()
 
 	response := GraphiteResponse{}

+ 4 - 0
pkg/services/sqlstore/alert_state.go

@@ -29,6 +29,10 @@ func SetNewAlertState(cmd *m.UpdateAlertStateCommand) error {
 			return err
 		}
 
+		//if alert.State == cmd.NewState {
+		//	return nil
+		//}
+
 		alert.State = cmd.NewState
 		sess.Id(alert.Id).Update(&alert)
 

+ 5 - 2
pkg/services/sqlstore/dashboard_parser_test.go

@@ -109,7 +109,7 @@ func TestAlertModel(t *testing.T) {
             "critOperator": ">",
             "aggregator": "sum",
             "queryRange": "10m",
-            "interval": "10s",
+            "frequency": 10,
             "title": "active desktop users",
             "description": "restart webservers"
           },
@@ -196,7 +196,7 @@ func TestAlertModel(t *testing.T) {
             "critLevel": 500,
             "aggregator": "avg",
             "queryRange": "10m",
-            "interval": "10s",
+            "frequency": 10,
             "title": "active mobile users",
             "description": "restart itunes"
           },
@@ -393,6 +393,9 @@ func TestAlertModel(t *testing.T) {
 			So(alerts[0].WarnLevel, ShouldEqual, 30)
 			So(alerts[1].WarnLevel, ShouldEqual, 300)
 
+			So(alerts[0].Frequency, ShouldEqual, 10)
+			So(alerts[1].Frequency, ShouldEqual, 10)
+
 			So(alerts[0].CritLevel, ShouldEqual, 50)
 			So(alerts[1].CritLevel, ShouldEqual, 500)