|
|
@@ -1,6 +1,7 @@
|
|
|
package alerting
|
|
|
|
|
|
import (
|
|
|
+ "context"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
"time"
|
|
|
@@ -59,23 +60,39 @@ func (n *notificationService) SendIfNeeded(context *EvalContext) error {
|
|
|
return n.sendNotifications(context, notifiers)
|
|
|
}
|
|
|
|
|
|
-func (n *notificationService) sendNotifications(context *EvalContext, notifiers []Notifier) error {
|
|
|
- g, _ := errgroup.WithContext(context.Ctx)
|
|
|
+func (n *notificationService) sendNotifications(evalContext *EvalContext, notifiers []Notifier) error {
|
|
|
+ g, _ := errgroup.WithContext(evalContext.Ctx)
|
|
|
|
|
|
for _, notifier := range notifiers {
|
|
|
not := notifier //avoid updating scope variable in go routine
|
|
|
- n.log.Debug("Sending notification", "type", not.GetType(), "id", not.GetNotifierId(), "isDefault", not.GetIsDefault())
|
|
|
- metrics.M_Alerting_Notification_Sent.WithLabelValues(not.GetType()).Inc()
|
|
|
+
|
|
|
g.Go(func() error {
|
|
|
- success := not.Notify(context) == nil
|
|
|
- cmd := &m.RecordNotificationJournalCommand{
|
|
|
- OrgId: context.Rule.OrgId,
|
|
|
- AlertId: context.Rule.Id,
|
|
|
- NotifierId: not.GetNotifierId(),
|
|
|
- SentAt: time.Now(),
|
|
|
- Success: success,
|
|
|
- }
|
|
|
- return bus.Dispatch(cmd)
|
|
|
+ return bus.InTransaction(evalContext.Ctx, func(ctx context.Context) error {
|
|
|
+ n.log.Debug("trying to send notification", "id", not.GetNotifierId())
|
|
|
+
|
|
|
+ // Verify that we can send the notification again
|
|
|
+ // but this time within the same transaction.
|
|
|
+ if !evalContext.IsTestRun && !not.ShouldNotify(evalContext) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ n.log.Debug("Sending notification", "type", not.GetType(), "id", not.GetNotifierId(), "isDefault", not.GetIsDefault())
|
|
|
+ metrics.M_Alerting_Notification_Sent.WithLabelValues(not.GetType()).Inc()
|
|
|
+
|
|
|
+ //send notification
|
|
|
+ success := not.Notify(evalContext) == nil
|
|
|
+
|
|
|
+ //write result to db.
|
|
|
+ cmd := &m.RecordNotificationJournalCommand{
|
|
|
+ OrgId: evalContext.Rule.OrgId,
|
|
|
+ AlertId: evalContext.Rule.Id,
|
|
|
+ NotifierId: not.GetNotifierId(),
|
|
|
+ SentAt: time.Now(),
|
|
|
+ Success: success,
|
|
|
+ }
|
|
|
+
|
|
|
+ return bus.DispatchCtx(evalContext.Ctx, cmd)
|
|
|
+ })
|
|
|
})
|
|
|
}
|
|
|
|
|
|
@@ -118,7 +135,7 @@ func (n *notificationService) uploadImage(context *EvalContext) (err error) {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (n *notificationService) getNeededNotifiers(orgId int64, notificationIds []int64, context *EvalContext) (NotifierSlice, error) {
|
|
|
+func (n *notificationService) getNeededNotifiers(orgId int64, notificationIds []int64, evalContext *EvalContext) (NotifierSlice, error) {
|
|
|
query := &m.GetAlertNotificationsToSendQuery{OrgId: orgId, Ids: notificationIds}
|
|
|
|
|
|
if err := bus.Dispatch(query); err != nil {
|
|
|
@@ -132,7 +149,7 @@ func (n *notificationService) getNeededNotifiers(orgId int64, notificationIds []
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- if not.ShouldNotify(context) {
|
|
|
+ if not.ShouldNotify(evalContext) {
|
|
|
result = append(result, not)
|
|
|
}
|
|
|
}
|