Просмотр исходного кода

Merge pull request #11290 from Thib17/retry

Alerting: Add retry mechanism
Carl Bergquist 7 лет назад
Родитель
Сommit
8db09d7556

+ 64 - 26
pkg/services/alerting/engine.go

@@ -86,17 +86,63 @@ func (e *Engine) runJobDispatcher(grafanaCtx context.Context) error {
 		case <-grafanaCtx.Done():
 		case <-grafanaCtx.Done():
 			return dispatcherGroup.Wait()
 			return dispatcherGroup.Wait()
 		case job := <-e.execQueue:
 		case job := <-e.execQueue:
-			dispatcherGroup.Go(func() error { return e.processJob(alertCtx, job) })
+			dispatcherGroup.Go(func() error { return e.processJobWithRetry(alertCtx, job) })
 		}
 		}
 	}
 	}
 }
 }
 
 
 var (
 var (
 	unfinishedWorkTimeout time.Duration = time.Second * 5
 	unfinishedWorkTimeout time.Duration = time.Second * 5
-	alertTimeout          time.Duration = time.Second * 30
+	// TODO: Make alertTimeout and alertMaxAttempts configurable in the config file.
+	alertTimeout     time.Duration = time.Second * 30
+	alertMaxAttempts int           = 3
 )
 )
 
 
-func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error {
+func (e *Engine) processJobWithRetry(grafanaCtx context.Context, job *Job) error {
+	defer func() {
+		if err := recover(); err != nil {
+			e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
+		}
+	}()
+
+	cancelChan := make(chan context.CancelFunc, alertMaxAttempts)
+	attemptChan := make(chan int, 1)
+
+	// Initialize with first attemptID=1
+	attemptChan <- 1
+	job.Running = true
+
+	for {
+		select {
+		case <-grafanaCtx.Done():
+			// In case grafana server context is cancel, let a chance to job processing
+			// to finish gracefully - by waiting a timeout duration - before forcing its end.
+			unfinishedWorkTimer := time.NewTimer(unfinishedWorkTimeout)
+			select {
+			case <-unfinishedWorkTimer.C:
+				return e.endJob(grafanaCtx.Err(), cancelChan, job)
+			case <-attemptChan:
+				return e.endJob(nil, cancelChan, job)
+			}
+		case attemptID, more := <-attemptChan:
+			if !more {
+				return e.endJob(nil, cancelChan, job)
+			}
+			go e.processJob(attemptID, attemptChan, cancelChan, job)
+		}
+	}
+}
+
+func (e *Engine) endJob(err error, cancelChan chan context.CancelFunc, job *Job) error {
+	job.Running = false
+	close(cancelChan)
+	for cancelFn := range cancelChan {
+		cancelFn()
+	}
+	return err
+}
+
+func (e *Engine) processJob(attemptID int, attemptChan chan int, cancelChan chan context.CancelFunc, job *Job) {
 	defer func() {
 	defer func() {
 		if err := recover(); err != nil {
 		if err := recover(); err != nil {
 			e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
 			e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
@@ -104,14 +150,13 @@ func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error {
 	}()
 	}()
 
 
 	alertCtx, cancelFn := context.WithTimeout(context.Background(), alertTimeout)
 	alertCtx, cancelFn := context.WithTimeout(context.Background(), alertTimeout)
+	cancelChan <- cancelFn
 	span := opentracing.StartSpan("alert execution")
 	span := opentracing.StartSpan("alert execution")
 	alertCtx = opentracing.ContextWithSpan(alertCtx, span)
 	alertCtx = opentracing.ContextWithSpan(alertCtx, span)
 
 
-	job.Running = true
 	evalContext := NewEvalContext(alertCtx, job.Rule)
 	evalContext := NewEvalContext(alertCtx, job.Rule)
 	evalContext.Ctx = alertCtx
 	evalContext.Ctx = alertCtx
 
 
-	done := make(chan struct{})
 	go func() {
 	go func() {
 		defer func() {
 		defer func() {
 			if err := recover(); err != nil {
 			if err := recover(); err != nil {
@@ -122,43 +167,36 @@ func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error {
 					tlog.String("message", "failed to execute alert rule. panic was recovered."),
 					tlog.String("message", "failed to execute alert rule. panic was recovered."),
 				)
 				)
 				span.Finish()
 				span.Finish()
-				close(done)
+				close(attemptChan)
 			}
 			}
 		}()
 		}()
 
 
 		e.evalHandler.Eval(evalContext)
 		e.evalHandler.Eval(evalContext)
-		e.resultHandler.Handle(evalContext)
 
 
 		span.SetTag("alertId", evalContext.Rule.Id)
 		span.SetTag("alertId", evalContext.Rule.Id)
 		span.SetTag("dashboardId", evalContext.Rule.DashboardId)
 		span.SetTag("dashboardId", evalContext.Rule.DashboardId)
 		span.SetTag("firing", evalContext.Firing)
 		span.SetTag("firing", evalContext.Firing)
 		span.SetTag("nodatapoints", evalContext.NoDataFound)
 		span.SetTag("nodatapoints", evalContext.NoDataFound)
+		span.SetTag("attemptID", attemptID)
+
 		if evalContext.Error != nil {
 		if evalContext.Error != nil {
 			ext.Error.Set(span, true)
 			ext.Error.Set(span, true)
 			span.LogFields(
 			span.LogFields(
 				tlog.Error(evalContext.Error),
 				tlog.Error(evalContext.Error),
-				tlog.String("message", "alerting execution failed"),
+				tlog.String("message", "alerting execution attempt failed"),
 			)
 			)
+			if attemptID < alertMaxAttempts {
+				span.Finish()
+				e.log.Debug("Job Execution attempt triggered retry", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID)
+				attemptChan <- (attemptID + 1)
+				return
+			}
 		}
 		}
 
 
+		evalContext.Rule.State = evalContext.GetNewState()
+		e.resultHandler.Handle(evalContext)
 		span.Finish()
 		span.Finish()
-		close(done)
+		e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID)
+		close(attemptChan)
 	}()
 	}()
-
-	var err error = nil
-	select {
-	case <-grafanaCtx.Done():
-		select {
-		case <-time.After(unfinishedWorkTimeout):
-			cancelFn()
-			err = grafanaCtx.Err()
-		case <-done:
-		}
-	case <-done:
-	}
-
-	e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing)
-	job.Running = false
-	cancelFn()
-	return err
 }
 }

+ 118 - 0
pkg/services/alerting/engine_test.go

@@ -0,0 +1,118 @@
+package alerting
+
+import (
+	"context"
+	"errors"
+	"math"
+	"testing"
+
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+type FakeEvalHandler struct {
+	SuccessCallID int // 0 means never sucess
+	CallNb        int
+}
+
+func NewFakeEvalHandler(successCallID int) *FakeEvalHandler {
+	return &FakeEvalHandler{
+		SuccessCallID: successCallID,
+		CallNb:        0,
+	}
+}
+
+func (handler *FakeEvalHandler) Eval(evalContext *EvalContext) {
+	handler.CallNb++
+	if handler.CallNb != handler.SuccessCallID {
+		evalContext.Error = errors.New("Fake evaluation failure")
+	}
+}
+
+type FakeResultHandler struct{}
+
+func (handler *FakeResultHandler) Handle(evalContext *EvalContext) error {
+	return nil
+}
+
+func TestEngineProcessJob(t *testing.T) {
+	Convey("Alerting engine job processing", t, func() {
+		engine := NewEngine()
+		engine.resultHandler = &FakeResultHandler{}
+		job := &Job{Running: true, Rule: &Rule{}}
+
+		Convey("Should trigger retry if needed", func() {
+
+			Convey("error + not last attempt -> retry", func() {
+				engine.evalHandler = NewFakeEvalHandler(0)
+
+				for i := 1; i < alertMaxAttempts; i++ {
+					attemptChan := make(chan int, 1)
+					cancelChan := make(chan context.CancelFunc, alertMaxAttempts)
+
+					engine.processJob(i, attemptChan, cancelChan, job)
+					nextAttemptID, more := <-attemptChan
+
+					So(nextAttemptID, ShouldEqual, i+1)
+					So(more, ShouldEqual, true)
+					So(<-cancelChan, ShouldNotBeNil)
+				}
+			})
+
+			Convey("error + last attempt -> no retry", func() {
+				engine.evalHandler = NewFakeEvalHandler(0)
+				attemptChan := make(chan int, 1)
+				cancelChan := make(chan context.CancelFunc, alertMaxAttempts)
+
+				engine.processJob(alertMaxAttempts, attemptChan, cancelChan, job)
+				nextAttemptID, more := <-attemptChan
+
+				So(nextAttemptID, ShouldEqual, 0)
+				So(more, ShouldEqual, false)
+				So(<-cancelChan, ShouldNotBeNil)
+			})
+
+			Convey("no error -> no retry", func() {
+				engine.evalHandler = NewFakeEvalHandler(1)
+				attemptChan := make(chan int, 1)
+				cancelChan := make(chan context.CancelFunc, alertMaxAttempts)
+
+				engine.processJob(1, attemptChan, cancelChan, job)
+				nextAttemptID, more := <-attemptChan
+
+				So(nextAttemptID, ShouldEqual, 0)
+				So(more, ShouldEqual, false)
+				So(<-cancelChan, ShouldNotBeNil)
+			})
+		})
+
+		Convey("Should trigger as many retries as needed", func() {
+
+			Convey("never sucess -> max retries number", func() {
+				expectedAttempts := alertMaxAttempts
+				evalHandler := NewFakeEvalHandler(0)
+				engine.evalHandler = evalHandler
+
+				engine.processJobWithRetry(context.TODO(), job)
+				So(evalHandler.CallNb, ShouldEqual, expectedAttempts)
+			})
+
+			Convey("always sucess -> never retry", func() {
+				expectedAttempts := 1
+				evalHandler := NewFakeEvalHandler(1)
+				engine.evalHandler = evalHandler
+
+				engine.processJobWithRetry(context.TODO(), job)
+				So(evalHandler.CallNb, ShouldEqual, expectedAttempts)
+			})
+
+			Convey("some errors before sucess -> some retries", func() {
+				expectedAttempts := int(math.Ceil(float64(alertMaxAttempts) / 2))
+				evalHandler := NewFakeEvalHandler(expectedAttempts)
+				engine.evalHandler = evalHandler
+
+				engine.processJobWithRetry(context.TODO(), job)
+				So(evalHandler.CallNb, ShouldEqual, expectedAttempts)
+			})
+		})
+	})
+}

+ 31 - 0
pkg/services/alerting/eval_context.go

@@ -112,3 +112,34 @@ func (c *EvalContext) GetRuleUrl() (string, error) {
 		return fmt.Sprintf(urlFormat, m.GetFullDashboardUrl(ref.Uid, ref.Slug), c.Rule.PanelId, c.Rule.OrgId), nil
 		return fmt.Sprintf(urlFormat, m.GetFullDashboardUrl(ref.Uid, ref.Slug), c.Rule.PanelId, c.Rule.OrgId), nil
 	}
 	}
 }
 }
+
+func (c *EvalContext) GetNewState() m.AlertStateType {
+	if c.Error != nil {
+		c.log.Error("Alert Rule Result Error",
+			"ruleId", c.Rule.Id,
+			"name", c.Rule.Name,
+			"error", c.Error,
+			"changing state to", c.Rule.ExecutionErrorState.ToAlertState())
+
+		if c.Rule.ExecutionErrorState == m.ExecutionErrorKeepState {
+			return c.PrevAlertState
+		}
+		return c.Rule.ExecutionErrorState.ToAlertState()
+
+	} else if c.Firing {
+		return m.AlertStateAlerting
+
+	} else if c.NoDataFound {
+		c.log.Info("Alert Rule returned no data",
+			"ruleId", c.Rule.Id,
+			"name", c.Rule.Name,
+			"changing state to", c.Rule.NoDataState.ToAlertState())
+
+		if c.Rule.NoDataState == m.NoDataKeepState {
+			return c.PrevAlertState
+		}
+		return c.Rule.NoDataState.ToAlertState()
+	}
+
+	return m.AlertStateOK
+}

+ 68 - 1
pkg/services/alerting/eval_context_test.go

@@ -2,6 +2,7 @@ package alerting
 
 
 import (
 import (
 	"context"
 	"context"
+	"fmt"
 	"testing"
 	"testing"
 
 
 	"github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/models"
@@ -12,7 +13,7 @@ func TestAlertingEvalContext(t *testing.T) {
 	Convey("Eval context", t, func() {
 	Convey("Eval context", t, func() {
 		ctx := NewEvalContext(context.TODO(), &Rule{Conditions: []Condition{&conditionStub{firing: true}}})
 		ctx := NewEvalContext(context.TODO(), &Rule{Conditions: []Condition{&conditionStub{firing: true}}})
 
 
-		Convey("Should update alert state", func() {
+		Convey("Should update alert state when needed", func() {
 
 
 			Convey("ok -> alerting", func() {
 			Convey("ok -> alerting", func() {
 				ctx.PrevAlertState = models.AlertStateOK
 				ctx.PrevAlertState = models.AlertStateOK
@@ -28,5 +29,71 @@ func TestAlertingEvalContext(t *testing.T) {
 				So(ctx.ShouldUpdateAlertState(), ShouldBeFalse)
 				So(ctx.ShouldUpdateAlertState(), ShouldBeFalse)
 			})
 			})
 		})
 		})
+
+		Convey("Should compute and replace properly new rule state", func() {
+			dummieError := fmt.Errorf("dummie error")
+
+			Convey("ok -> alerting", func() {
+				ctx.PrevAlertState = models.AlertStateOK
+				ctx.Firing = true
+
+				ctx.Rule.State = ctx.GetNewState()
+				So(ctx.Rule.State, ShouldEqual, models.AlertStateAlerting)
+			})
+
+			Convey("ok -> error(alerting)", func() {
+				ctx.PrevAlertState = models.AlertStateOK
+				ctx.Error = dummieError
+				ctx.Rule.ExecutionErrorState = models.ExecutionErrorSetAlerting
+
+				ctx.Rule.State = ctx.GetNewState()
+				So(ctx.Rule.State, ShouldEqual, models.AlertStateAlerting)
+			})
+
+			Convey("ok -> error(keep_last)", func() {
+				ctx.PrevAlertState = models.AlertStateOK
+				ctx.Error = dummieError
+				ctx.Rule.ExecutionErrorState = models.ExecutionErrorKeepState
+
+				ctx.Rule.State = ctx.GetNewState()
+				So(ctx.Rule.State, ShouldEqual, models.AlertStateOK)
+			})
+
+			Convey("pending -> error(keep_last)", func() {
+				ctx.PrevAlertState = models.AlertStatePending
+				ctx.Error = dummieError
+				ctx.Rule.ExecutionErrorState = models.ExecutionErrorKeepState
+
+				ctx.Rule.State = ctx.GetNewState()
+				So(ctx.Rule.State, ShouldEqual, models.AlertStatePending)
+			})
+
+			Convey("ok -> no_data(alerting)", func() {
+				ctx.PrevAlertState = models.AlertStateOK
+				ctx.Rule.NoDataState = models.NoDataSetAlerting
+				ctx.NoDataFound = true
+
+				ctx.Rule.State = ctx.GetNewState()
+				So(ctx.Rule.State, ShouldEqual, models.AlertStateAlerting)
+			})
+
+			Convey("ok -> no_data(keep_last)", func() {
+				ctx.PrevAlertState = models.AlertStateOK
+				ctx.Rule.NoDataState = models.NoDataKeepState
+				ctx.NoDataFound = true
+
+				ctx.Rule.State = ctx.GetNewState()
+				So(ctx.Rule.State, ShouldEqual, models.AlertStateOK)
+			})
+
+			Convey("pending -> no_data(keep_last)", func() {
+				ctx.PrevAlertState = models.AlertStatePending
+				ctx.Rule.NoDataState = models.NoDataKeepState
+				ctx.NoDataFound = true
+
+				ctx.Rule.State = ctx.GetNewState()
+				So(ctx.Rule.State, ShouldEqual, models.AlertStatePending)
+			})
+		})
 	})
 	})
 }
 }

+ 0 - 34
pkg/services/alerting/eval_handler.go

@@ -7,7 +7,6 @@ import (
 
 
 	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/metrics"
 	"github.com/grafana/grafana/pkg/metrics"
-	"github.com/grafana/grafana/pkg/models"
 )
 )
 
 
 type DefaultEvalHandler struct {
 type DefaultEvalHandler struct {
@@ -66,40 +65,7 @@ func (e *DefaultEvalHandler) Eval(context *EvalContext) {
 	context.Firing = firing
 	context.Firing = firing
 	context.NoDataFound = noDataFound
 	context.NoDataFound = noDataFound
 	context.EndTime = time.Now()
 	context.EndTime = time.Now()
-	context.Rule.State = e.getNewState(context)
 
 
 	elapsedTime := context.EndTime.Sub(context.StartTime).Nanoseconds() / int64(time.Millisecond)
 	elapsedTime := context.EndTime.Sub(context.StartTime).Nanoseconds() / int64(time.Millisecond)
 	metrics.M_Alerting_Execution_Time.Observe(float64(elapsedTime))
 	metrics.M_Alerting_Execution_Time.Observe(float64(elapsedTime))
 }
 }
-
-// This should be move into evalContext once its been refactored. (Carl Bergquist)
-func (handler *DefaultEvalHandler) getNewState(evalContext *EvalContext) models.AlertStateType {
-	if evalContext.Error != nil {
-		handler.log.Error("Alert Rule Result Error",
-			"ruleId", evalContext.Rule.Id,
-			"name", evalContext.Rule.Name,
-			"error", evalContext.Error,
-			"changing state to", evalContext.Rule.ExecutionErrorState.ToAlertState())
-
-		if evalContext.Rule.ExecutionErrorState == models.ExecutionErrorKeepState {
-			return evalContext.PrevAlertState
-		} else {
-			return evalContext.Rule.ExecutionErrorState.ToAlertState()
-		}
-	} else if evalContext.Firing {
-		return models.AlertStateAlerting
-	} else if evalContext.NoDataFound {
-		handler.log.Info("Alert Rule returned no data",
-			"ruleId", evalContext.Rule.Id,
-			"name", evalContext.Rule.Name,
-			"changing state to", evalContext.Rule.NoDataState.ToAlertState())
-
-		if evalContext.Rule.NoDataState == models.NoDataKeepState {
-			return evalContext.PrevAlertState
-		} else {
-			return evalContext.Rule.NoDataState.ToAlertState()
-		}
-	}
-
-	return models.AlertStateOK
-}

+ 0 - 70
pkg/services/alerting/eval_handler_test.go

@@ -2,10 +2,8 @@ package alerting
 
 
 import (
 import (
 	"context"
 	"context"
-	"fmt"
 	"testing"
 	"testing"
 
 
-	"github.com/grafana/grafana/pkg/models"
 	. "github.com/smartystreets/goconvey/convey"
 	. "github.com/smartystreets/goconvey/convey"
 )
 )
 
 
@@ -203,73 +201,5 @@ func TestAlertingEvaluationHandler(t *testing.T) {
 			handler.Eval(context)
 			handler.Eval(context)
 			So(context.NoDataFound, ShouldBeTrue)
 			So(context.NoDataFound, ShouldBeTrue)
 		})
 		})
-
-		Convey("EvalHandler can replace alert state based for errors and no_data", func() {
-			ctx := NewEvalContext(context.TODO(), &Rule{Conditions: []Condition{&conditionStub{firing: true}}})
-			dummieError := fmt.Errorf("dummie error")
-			Convey("Should update alert state", func() {
-
-				Convey("ok -> alerting", func() {
-					ctx.PrevAlertState = models.AlertStateOK
-					ctx.Firing = true
-
-					So(handler.getNewState(ctx), ShouldEqual, models.AlertStateAlerting)
-				})
-
-				Convey("ok -> error(alerting)", func() {
-					ctx.PrevAlertState = models.AlertStateOK
-					ctx.Error = dummieError
-					ctx.Rule.ExecutionErrorState = models.ExecutionErrorSetAlerting
-
-					ctx.Rule.State = handler.getNewState(ctx)
-					So(ctx.Rule.State, ShouldEqual, models.AlertStateAlerting)
-				})
-
-				Convey("ok -> error(keep_last)", func() {
-					ctx.PrevAlertState = models.AlertStateOK
-					ctx.Error = dummieError
-					ctx.Rule.ExecutionErrorState = models.ExecutionErrorKeepState
-
-					ctx.Rule.State = handler.getNewState(ctx)
-					So(ctx.Rule.State, ShouldEqual, models.AlertStateOK)
-				})
-
-				Convey("pending -> error(keep_last)", func() {
-					ctx.PrevAlertState = models.AlertStatePending
-					ctx.Error = dummieError
-					ctx.Rule.ExecutionErrorState = models.ExecutionErrorKeepState
-
-					ctx.Rule.State = handler.getNewState(ctx)
-					So(ctx.Rule.State, ShouldEqual, models.AlertStatePending)
-				})
-
-				Convey("ok -> no_data(alerting)", func() {
-					ctx.PrevAlertState = models.AlertStateOK
-					ctx.Rule.NoDataState = models.NoDataSetAlerting
-					ctx.NoDataFound = true
-
-					ctx.Rule.State = handler.getNewState(ctx)
-					So(ctx.Rule.State, ShouldEqual, models.AlertStateAlerting)
-				})
-
-				Convey("ok -> no_data(keep_last)", func() {
-					ctx.PrevAlertState = models.AlertStateOK
-					ctx.Rule.NoDataState = models.NoDataKeepState
-					ctx.NoDataFound = true
-
-					ctx.Rule.State = handler.getNewState(ctx)
-					So(ctx.Rule.State, ShouldEqual, models.AlertStateOK)
-				})
-
-				Convey("pending -> no_data(keep_last)", func() {
-					ctx.PrevAlertState = models.AlertStatePending
-					ctx.Rule.NoDataState = models.NoDataKeepState
-					ctx.NoDataFound = true
-
-					ctx.Rule.State = handler.getNewState(ctx)
-					So(ctx.Rule.State, ShouldEqual, models.AlertStatePending)
-				})
-			})
-		})
 	})
 	})
 }
 }

+ 1 - 0
pkg/services/alerting/test_rule.go

@@ -53,6 +53,7 @@ func testAlertRule(rule *Rule) *EvalContext {
 	context.IsTestRun = true
 	context.IsTestRun = true
 
 
 	handler.Eval(context)
 	handler.Eval(context)
+	context.Rule.State = context.GetNewState()
 
 
 	return context
 	return context
 }
 }