Kaynağa Gözat

Alerting: Add retry mechanism and its unitests

Signed-off-by: Thibault Chataigner <t.chataigner@criteo.com>
Thibault Chataigner 7 yıl önce
ebeveyn
işleme
5d23e7710b

+ 63 - 26
pkg/services/alerting/engine.go

@@ -86,17 +86,63 @@ func (e *Engine) runJobDispatcher(grafanaCtx context.Context) error {
 		case <-grafanaCtx.Done():
 		case <-grafanaCtx.Done():
 			return dispatcherGroup.Wait()
 			return dispatcherGroup.Wait()
 		case job := <-e.execQueue:
 		case job := <-e.execQueue:
-			dispatcherGroup.Go(func() error { return e.processJob(alertCtx, job) })
+			dispatcherGroup.Go(func() error { return e.processJobWithRetry(alertCtx, job) })
 		}
 		}
 	}
 	}
 }
 }
 
 
 var (
 var (
 	unfinishedWorkTimeout time.Duration = time.Second * 5
 	unfinishedWorkTimeout time.Duration = time.Second * 5
-	alertTimeout          time.Duration = time.Second * 30
+	// TODO: Make alertTimeout and alertMaxAttempts configurable in the config file.
+	alertTimeout     time.Duration = time.Second * 30
+	alertMaxAttempts int           = 3
 )
 )
 
 
-func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error {
+func (e *Engine) processJobWithRetry(grafanaCtx context.Context, job *Job) error {
+	defer func() {
+		if err := recover(); err != nil {
+			e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
+		}
+	}()
+
+	cancelChan := make(chan context.CancelFunc, alertMaxAttempts)
+	attemptChan := make(chan int, 1)
+
+	// Initialize with first attemptID=1
+	attemptChan <- 1
+	job.Running = true
+
+	for {
+		select {
+		case <-grafanaCtx.Done():
+			// In case grafana server context is cancel, let a chance to job processing
+			// to finish gracefully - by waiting a timeout duration - before forcing its end.
+			unfinishedWorkTimer := time.NewTimer(unfinishedWorkTimeout)
+			select {
+			case <-unfinishedWorkTimer.C:
+				return e.endJob(grafanaCtx.Err(), cancelChan, job)
+			case <-attemptChan:
+				return e.endJob(nil, cancelChan, job)
+			}
+		case attemptID, more := <-attemptChan:
+			if !more {
+				return e.endJob(nil, cancelChan, job)
+			}
+			go e.processJob(attemptID, attemptChan, cancelChan, job)
+		}
+	}
+}
+
+func (e *Engine) endJob(err error, cancelChan chan context.CancelFunc, job *Job) error {
+	job.Running = false
+	close(cancelChan)
+	for cancelFn := range cancelChan {
+		cancelFn()
+	}
+	return err
+}
+
+func (e *Engine) processJob(attemptID int, attemptChan chan int, cancelChan chan context.CancelFunc, job *Job) {
 	defer func() {
 	defer func() {
 		if err := recover(); err != nil {
 		if err := recover(); err != nil {
 			e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
 			e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
@@ -104,14 +150,13 @@ func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error {
 	}()
 	}()
 
 
 	alertCtx, cancelFn := context.WithTimeout(context.Background(), alertTimeout)
 	alertCtx, cancelFn := context.WithTimeout(context.Background(), alertTimeout)
+	cancelChan <- cancelFn
 	span := opentracing.StartSpan("alert execution")
 	span := opentracing.StartSpan("alert execution")
 	alertCtx = opentracing.ContextWithSpan(alertCtx, span)
 	alertCtx = opentracing.ContextWithSpan(alertCtx, span)
 
 
-	job.Running = true
 	evalContext := NewEvalContext(alertCtx, job.Rule)
 	evalContext := NewEvalContext(alertCtx, job.Rule)
 	evalContext.Ctx = alertCtx
 	evalContext.Ctx = alertCtx
 
 
-	done := make(chan struct{})
 	go func() {
 	go func() {
 		defer func() {
 		defer func() {
 			if err := recover(); err != nil {
 			if err := recover(); err != nil {
@@ -122,43 +167,35 @@ func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error {
 					tlog.String("message", "failed to execute alert rule. panic was recovered."),
 					tlog.String("message", "failed to execute alert rule. panic was recovered."),
 				)
 				)
 				span.Finish()
 				span.Finish()
-				close(done)
+				close(attemptChan)
 			}
 			}
 		}()
 		}()
 
 
 		e.evalHandler.Eval(evalContext)
 		e.evalHandler.Eval(evalContext)
-		e.resultHandler.Handle(evalContext)
 
 
 		span.SetTag("alertId", evalContext.Rule.Id)
 		span.SetTag("alertId", evalContext.Rule.Id)
 		span.SetTag("dashboardId", evalContext.Rule.DashboardId)
 		span.SetTag("dashboardId", evalContext.Rule.DashboardId)
 		span.SetTag("firing", evalContext.Firing)
 		span.SetTag("firing", evalContext.Firing)
 		span.SetTag("nodatapoints", evalContext.NoDataFound)
 		span.SetTag("nodatapoints", evalContext.NoDataFound)
+		span.SetTag("attemptID", attemptID)
+
 		if evalContext.Error != nil {
 		if evalContext.Error != nil {
 			ext.Error.Set(span, true)
 			ext.Error.Set(span, true)
 			span.LogFields(
 			span.LogFields(
 				tlog.Error(evalContext.Error),
 				tlog.Error(evalContext.Error),
-				tlog.String("message", "alerting execution failed"),
+				tlog.String("message", "alerting execution attempt failed"),
 			)
 			)
+			if attemptID < alertMaxAttempts {
+				span.Finish()
+				e.log.Debug("Job Execution attempt triggered retry", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID)
+				attemptChan <- (attemptID + 1)
+				return
+			}
 		}
 		}
 
 
+		e.resultHandler.Handle(evalContext)
 		span.Finish()
 		span.Finish()
-		close(done)
+		e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID)
+		close(attemptChan)
 	}()
 	}()
-
-	var err error = nil
-	select {
-	case <-grafanaCtx.Done():
-		select {
-		case <-time.After(unfinishedWorkTimeout):
-			cancelFn()
-			err = grafanaCtx.Err()
-		case <-done:
-		}
-	case <-done:
-	}
-
-	e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing)
-	job.Running = false
-	cancelFn()
-	return err
 }
 }

+ 118 - 0
pkg/services/alerting/engine_test.go

@@ -0,0 +1,118 @@
+package alerting
+
+import (
+	"context"
+	"errors"
+	"math"
+	"testing"
+
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+type FakeEvalHandler struct {
+	SuccessCallID int // 0 means never sucess
+	CallNb        int
+}
+
+func NewFakeEvalHandler(successCallID int) *FakeEvalHandler {
+	return &FakeEvalHandler{
+		SuccessCallID: successCallID,
+		CallNb:        0,
+	}
+}
+
+func (handler *FakeEvalHandler) Eval(evalContext *EvalContext) {
+	handler.CallNb++
+	if handler.CallNb != handler.SuccessCallID {
+		evalContext.Error = errors.New("Fake evaluation failure")
+	}
+}
+
+type FakeResultHandler struct{}
+
+func (handler *FakeResultHandler) Handle(evalContext *EvalContext) error {
+	return nil
+}
+
+func TestEngineProcessJob(t *testing.T) {
+	Convey("Alerting engine job processing", t, func() {
+		engine := NewEngine()
+		engine.resultHandler = &FakeResultHandler{}
+		job := &Job{Running: true, Rule: &Rule{}}
+
+		Convey("Should trigger retry if needed", func() {
+
+			Convey("error + not last attempt -> retry", func() {
+				engine.evalHandler = NewFakeEvalHandler(0)
+
+				for i := 1; i < alertMaxAttempts; i++ {
+					attemptChan := make(chan int, 1)
+					cancelChan := make(chan context.CancelFunc, alertMaxAttempts)
+
+					engine.processJob(i, attemptChan, cancelChan, job)
+					nextAttemptID, more := <-attemptChan
+
+					So(nextAttemptID, ShouldEqual, i+1)
+					So(more, ShouldEqual, true)
+					So(<-cancelChan, ShouldNotBeNil)
+				}
+			})
+
+			Convey("error + last attempt -> no retry", func() {
+				engine.evalHandler = NewFakeEvalHandler(0)
+				attemptChan := make(chan int, 1)
+				cancelChan := make(chan context.CancelFunc, alertMaxAttempts)
+
+				engine.processJob(alertMaxAttempts, attemptChan, cancelChan, job)
+				nextAttemptID, more := <-attemptChan
+
+				So(nextAttemptID, ShouldEqual, 0)
+				So(more, ShouldEqual, false)
+				So(<-cancelChan, ShouldNotBeNil)
+			})
+
+			Convey("no error -> no retry", func() {
+				engine.evalHandler = NewFakeEvalHandler(1)
+				attemptChan := make(chan int, 1)
+				cancelChan := make(chan context.CancelFunc, alertMaxAttempts)
+
+				engine.processJob(1, attemptChan, cancelChan, job)
+				nextAttemptID, more := <-attemptChan
+
+				So(nextAttemptID, ShouldEqual, 0)
+				So(more, ShouldEqual, false)
+				So(<-cancelChan, ShouldNotBeNil)
+			})
+		})
+
+		Convey("Should trigger as many retries as needed", func() {
+
+			Convey("never sucess -> max retries number", func() {
+				expectedAttempts := alertMaxAttempts
+				evalHandler := NewFakeEvalHandler(0)
+				engine.evalHandler = evalHandler
+
+				engine.processJobWithRetry(context.TODO(), job)
+				So(evalHandler.CallNb, ShouldEqual, expectedAttempts)
+			})
+
+			Convey("always sucess -> never retry", func() {
+				expectedAttempts := 1
+				evalHandler := NewFakeEvalHandler(1)
+				engine.evalHandler = evalHandler
+
+				engine.processJobWithRetry(context.TODO(), job)
+				So(evalHandler.CallNb, ShouldEqual, expectedAttempts)
+			})
+
+			Convey("some errors before sucess -> some retries", func() {
+				expectedAttempts := int(math.Ceil(float64(alertMaxAttempts) / 2))
+				evalHandler := NewFakeEvalHandler(expectedAttempts)
+				engine.evalHandler = evalHandler
+
+				engine.processJobWithRetry(context.TODO(), job)
+				So(evalHandler.CallNb, ShouldEqual, expectedAttempts)
+			})
+		})
+	})
+}