Преглед на файлове

refactor: refactoring notification service to use new service registry hooks

Torkel Ödegaard преди 7 години
родител
ревизия
df71fe33fd

+ 2 - 0
pkg/api/org_invite.go

@@ -60,7 +60,9 @@ func AddOrgInvite(c *m.ReqContext, inviteDto dtos.AddInviteForm) Response {
 	}
 
 	// send invite email
+	c.Logger.Error("sending?")
 	if inviteDto.SendEmail && util.IsEmail(inviteDto.LoginOrEmail) {
+		c.Logger.Error("yes sending?")
 		emailCmd := m.SendEmailCommand{
 			To:       []string{inviteDto.LoginOrEmail},
 			Template: "new_user_invite.html",

+ 4 - 7
pkg/cmd/grafana-server/server.go

@@ -26,16 +26,17 @@ import (
 	"github.com/grafana/grafana/pkg/login"
 	"github.com/grafana/grafana/pkg/metrics"
 	"github.com/grafana/grafana/pkg/plugins"
-	"github.com/grafana/grafana/pkg/services/notifications"
 	"github.com/grafana/grafana/pkg/services/sqlstore"
 	"github.com/grafana/grafana/pkg/setting"
 
 	"github.com/grafana/grafana/pkg/social"
 	"github.com/grafana/grafana/pkg/tracing"
 
+	// self registering services
 	_ "github.com/grafana/grafana/pkg/extensions"
 	_ "github.com/grafana/grafana/pkg/services/alerting"
 	_ "github.com/grafana/grafana/pkg/services/cleanup"
+	_ "github.com/grafana/grafana/pkg/services/notifications"
 	_ "github.com/grafana/grafana/pkg/services/search"
 )
 
@@ -56,9 +57,9 @@ type GrafanaServerImpl struct {
 	shutdownFn    context.CancelFunc
 	childRoutines *errgroup.Group
 	log           log.Logger
-	RouteRegister api.RouteRegister `inject:""`
 
-	HttpServer *api.HTTPServer `inject:""`
+	RouteRegister api.RouteRegister `inject:""`
+	HttpServer    *api.HTTPServer   `inject:""`
 }
 
 func (g *GrafanaServerImpl) Start() error {
@@ -89,10 +90,6 @@ func (g *GrafanaServerImpl) Start() error {
 	}
 	defer tracingCloser.Close()
 
-	if err = notifications.Init(); err != nil {
-		return fmt.Errorf("Notification service failed to initialize. error: %v", err)
-	}
-
 	serviceGraph := inject.Graph{}
 	serviceGraph.Provide(&inject.Object{Value: bus.GetBus()})
 	serviceGraph.Provide(&inject.Object{Value: dashboards.NewProvisioningService()})

+ 0 - 32
pkg/services/notifications/mailer.go

@@ -11,44 +11,12 @@ import (
 	"html/template"
 	"net"
 	"strconv"
-	"strings"
 
-	"github.com/grafana/grafana/pkg/log"
 	m "github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/setting"
 	gomail "gopkg.in/mail.v2"
 )
 
-var mailQueue chan *Message
-
-func initMailQueue() {
-	mailQueue = make(chan *Message, 10)
-	go processMailQueue()
-}
-
-func processMailQueue() {
-	for {
-		select {
-		case msg := <-mailQueue:
-			num, err := send(msg)
-			tos := strings.Join(msg.To, "; ")
-			info := ""
-			if err != nil {
-				if len(msg.Info) > 0 {
-					info = ", info: " + msg.Info
-				}
-				log.Error(4, fmt.Sprintf("Async sent email %d succeed, not send emails: %s%s err: %s", num, tos, info, err))
-			} else {
-				log.Trace(fmt.Sprintf("Async sent email %d succeed, sent emails: %s%s", num, tos, info))
-			}
-		}
-	}
-}
-
-var addToMailQueue = func(msg *Message) {
-	mailQueue <- msg
-}
-
 func send(msg *Message) (int, error) {
 	dialer, err := createDialer()
 	if err != nil {

+ 66 - 25
pkg/services/notifications/notifications.go

@@ -7,11 +7,13 @@ import (
 	"html/template"
 	"net/url"
 	"path/filepath"
+	"strings"
 
 	"github.com/grafana/grafana/pkg/bus"
 	"github.com/grafana/grafana/pkg/events"
 	"github.com/grafana/grafana/pkg/log"
 	m "github.com/grafana/grafana/pkg/models"
+	"github.com/grafana/grafana/pkg/registry"
 	"github.com/grafana/grafana/pkg/setting"
 	"github.com/grafana/grafana/pkg/util"
 )
@@ -21,20 +23,31 @@ var tmplResetPassword = "reset_password.html"
 var tmplSignUpStarted = "signup_started.html"
 var tmplWelcomeOnSignUp = "welcome_on_signup.html"
 
-func Init() error {
-	initMailQueue()
-	initWebhookQueue()
+func init() {
+	registry.RegisterService(&NotificationService{})
+}
+
+type NotificationService struct {
+	Bus          bus.Bus `inject:""`
+	mailQueue    chan *Message
+	webhookQueue chan *Webhook
+	log          log.Logger
+}
 
-	bus.AddHandler("email", sendResetPasswordEmail)
-	bus.AddHandler("email", validateResetPasswordCode)
-	bus.AddHandler("email", sendEmailCommandHandler)
+func (ns *NotificationService) Init() error {
+	ns.log = log.New("notifications")
+	ns.mailQueue = make(chan *Message, 10)
+	ns.webhookQueue = make(chan *Webhook, 10)
 
-	bus.AddCtxHandler("email", sendEmailCommandHandlerSync)
+	ns.Bus.AddHandler(ns.sendResetPasswordEmail)
+	ns.Bus.AddHandler(ns.validateResetPasswordCode)
+	ns.Bus.AddHandler(ns.sendEmailCommandHandler)
 
-	bus.AddCtxHandler("webhook", SendWebhookSync)
+	ns.Bus.AddCtxHandler(ns.sendEmailCommandHandlerSync)
+	ns.Bus.AddCtxHandler(ns.SendWebhookSync)
 
-	bus.AddEventListener(signUpStartedHandler)
-	bus.AddEventListener(signUpCompletedHandler)
+	ns.Bus.AddEventListener(ns.signUpStartedHandler)
+	ns.Bus.AddEventListener(ns.signUpCompletedHandler)
 
 	mailTemplates = template.New("name")
 	mailTemplates.Funcs(template.FuncMap{
@@ -58,8 +71,37 @@ func Init() error {
 	return nil
 }
 
-func SendWebhookSync(ctx context.Context, cmd *m.SendWebhookSync) error {
-	return sendWebRequestSync(ctx, &Webhook{
+func (ns *NotificationService) Run(ctx context.Context) error {
+	for {
+		select {
+		case webhook := <-ns.webhookQueue:
+			err := ns.sendWebRequestSync(context.Background(), webhook)
+
+			if err != nil {
+				ns.log.Error("Failed to send webrequest ", "error", err)
+			}
+		case msg := <-ns.mailQueue:
+			num, err := send(msg)
+			tos := strings.Join(msg.To, "; ")
+			info := ""
+			if err != nil {
+				if len(msg.Info) > 0 {
+					info = ", info: " + msg.Info
+				}
+				ns.log.Error(fmt.Sprintf("Async sent email %d succeed, not send emails: %s%s err: %s", num, tos, info, err))
+			} else {
+				ns.log.Debug(fmt.Sprintf("Async sent email %d succeed, sent emails: %s%s", num, tos, info))
+			}
+		case <-ctx.Done():
+			return ctx.Err()
+		}
+	}
+
+	return nil
+}
+
+func (ns *NotificationService) SendWebhookSync(ctx context.Context, cmd *m.SendWebhookSync) error {
+	return ns.sendWebRequestSync(ctx, &Webhook{
 		Url:        cmd.Url,
 		User:       cmd.User,
 		Password:   cmd.Password,
@@ -74,7 +116,7 @@ func subjectTemplateFunc(obj map[string]interface{}, value string) string {
 	return ""
 }
 
-func sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSync) error {
+func (ns *NotificationService) sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSync) error {
 	message, err := buildEmailMessage(&m.SendEmailCommand{
 		Data:         cmd.Data,
 		Info:         cmd.Info,
@@ -89,24 +131,22 @@ func sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSyn
 	}
 
 	_, err = send(message)
-
 	return err
 }
 
-func sendEmailCommandHandler(cmd *m.SendEmailCommand) error {
+func (ns *NotificationService) sendEmailCommandHandler(cmd *m.SendEmailCommand) error {
 	message, err := buildEmailMessage(cmd)
 
 	if err != nil {
 		return err
 	}
 
-	addToMailQueue(message)
-
+	ns.mailQueue <- message
 	return nil
 }
 
-func sendResetPasswordEmail(cmd *m.SendResetPasswordEmailCommand) error {
-	return sendEmailCommandHandler(&m.SendEmailCommand{
+func (ns *NotificationService) sendResetPasswordEmail(cmd *m.SendResetPasswordEmailCommand) error {
+	return ns.sendEmailCommandHandler(&m.SendEmailCommand{
 		To:       []string{cmd.User.Email},
 		Template: tmplResetPassword,
 		Data: map[string]interface{}{
@@ -116,7 +156,7 @@ func sendResetPasswordEmail(cmd *m.SendResetPasswordEmailCommand) error {
 	})
 }
 
-func validateResetPasswordCode(query *m.ValidateResetPasswordCodeQuery) error {
+func (ns *NotificationService) validateResetPasswordCode(query *m.ValidateResetPasswordCodeQuery) error {
 	login := getLoginForEmailCode(query.Code)
 	if login == "" {
 		return m.ErrInvalidEmailCode
@@ -135,18 +175,18 @@ func validateResetPasswordCode(query *m.ValidateResetPasswordCodeQuery) error {
 	return nil
 }
 
-func signUpStartedHandler(evt *events.SignUpStarted) error {
+func (ns *NotificationService) signUpStartedHandler(evt *events.SignUpStarted) error {
 	if !setting.VerifyEmailEnabled {
 		return nil
 	}
 
-	log.Info("User signup started: %s", evt.Email)
+	ns.log.Info("User signup started", "email", evt.Email)
 
 	if evt.Email == "" {
 		return nil
 	}
 
-	err := sendEmailCommandHandler(&m.SendEmailCommand{
+	err := ns.sendEmailCommandHandler(&m.SendEmailCommand{
 		To:       []string{evt.Email},
 		Template: tmplSignUpStarted,
 		Data: map[string]interface{}{
@@ -155,6 +195,7 @@ func signUpStartedHandler(evt *events.SignUpStarted) error {
 			"SignUpUrl": setting.ToAbsUrl(fmt.Sprintf("signup/?email=%s&code=%s", url.QueryEscape(evt.Email), url.QueryEscape(evt.Code))),
 		},
 	})
+
 	if err != nil {
 		return err
 	}
@@ -163,12 +204,12 @@ func signUpStartedHandler(evt *events.SignUpStarted) error {
 	return bus.Dispatch(&emailSentCmd)
 }
 
-func signUpCompletedHandler(evt *events.SignUpCompleted) error {
+func (ns *NotificationService) signUpCompletedHandler(evt *events.SignUpCompleted) error {
 	if evt.Email == "" || !setting.Smtp.SendWelcomeEmailOnSignUp {
 		return nil
 	}
 
-	return sendEmailCommandHandler(&m.SendEmailCommand{
+	return ns.sendEmailCommandHandler(&m.SendEmailCommand{
 		To:       []string{evt.Email},
 		Template: tmplWelcomeOnSignUp,
 		Data: map[string]interface{}{

+ 8 - 9
pkg/services/notifications/notifications_test.go

@@ -3,6 +3,7 @@ package notifications
 import (
 	"testing"
 
+	"github.com/grafana/grafana/pkg/bus"
 	m "github.com/grafana/grafana/pkg/models"
 	"github.com/grafana/grafana/pkg/setting"
 	. "github.com/smartystreets/goconvey/convey"
@@ -17,25 +18,23 @@ type testTriggeredAlert struct {
 func TestNotifications(t *testing.T) {
 
 	Convey("Given the notifications service", t, func() {
-		//bus.ClearBusHandlers()
-
 		setting.StaticRootPath = "../../../public/"
 		setting.Smtp.Enabled = true
 		setting.Smtp.TemplatesPattern = "emails/*.html"
 		setting.Smtp.FromAddress = "from@address.com"
 		setting.Smtp.FromName = "Grafana Admin"
 
-		err := Init()
-		So(err, ShouldBeNil)
+		ns := &NotificationService{}
+		ns.Bus = bus.New()
 
-		var sentMsg *Message
-		addToMailQueue = func(msg *Message) {
-			sentMsg = msg
-		}
+		err := ns.Init()
+		So(err, ShouldBeNil)
 
 		Convey("When sending reset email password", func() {
-			err := sendResetPasswordEmail(&m.SendResetPasswordEmailCommand{User: &m.User{Email: "asd@asd.com"}})
+			err := ns.sendResetPasswordEmail(&m.SendResetPasswordEmailCommand{User: &m.User{Email: "asd@asd.com"}})
 			So(err, ShouldBeNil)
+
+			sentMsg := <-ns.mailQueue
 			So(sentMsg.Body, ShouldContainSubstring, "body")
 			So(sentMsg.Subject, ShouldEqual, "Reset your Grafana password - asd@asd.com")
 			So(sentMsg.Body, ShouldNotContainSubstring, "Subject")

+ 10 - 10
pkg/services/notifications/send_email_integration_test.go

@@ -12,8 +12,6 @@ import (
 
 func TestEmailIntegrationTest(t *testing.T) {
 	SkipConvey("Given the notifications service", t, func() {
-		bus.ClearBusHandlers()
-
 		setting.StaticRootPath = "../../../public/"
 		setting.Smtp.Enabled = true
 		setting.Smtp.TemplatesPattern = "emails/*.html"
@@ -21,14 +19,11 @@ func TestEmailIntegrationTest(t *testing.T) {
 		setting.Smtp.FromName = "Grafana Admin"
 		setting.BuildVersion = "4.0.0"
 
-		err := Init()
-		So(err, ShouldBeNil)
+		ns := &NotificationService{}
+		ns.Bus = bus.New()
 
-		addToMailQueue = func(msg *Message) {
-			So(msg.From, ShouldEqual, "Grafana Admin <from@address.com>")
-			So(msg.To[0], ShouldEqual, "asdf@asdf.com")
-			ioutil.WriteFile("../../../tmp/test_email.html", []byte(msg.Body), 0777)
-		}
+		err := ns.Init()
+		So(err, ShouldBeNil)
 
 		Convey("When sending reset email password", func() {
 			cmd := &m.SendEmailCommand{
@@ -59,8 +54,13 @@ func TestEmailIntegrationTest(t *testing.T) {
 				Template: "alert_notification.html",
 			}
 
-			err := sendEmailCommandHandler(cmd)
+			err := ns.sendEmailCommandHandler(cmd)
 			So(err, ShouldBeNil)
+
+			sentMsg := <-ns.mailQueue
+			So(sentMsg.From, ShouldEqual, "Grafana Admin <from@address.com>")
+			So(sentMsg.To[0], ShouldEqual, "asdf@asdf.com")
+			ioutil.WriteFile("../../../tmp/test_email.html", []byte(sentMsg.Body), 0777)
 		})
 	})
 }

+ 3 - 28
pkg/services/notifications/webhook.go

@@ -11,7 +11,6 @@ import (
 
 	"golang.org/x/net/context/ctxhttp"
 
-	"github.com/grafana/grafana/pkg/log"
 	"github.com/grafana/grafana/pkg/util"
 )
 
@@ -37,32 +36,8 @@ var netClient = &http.Client{
 	Transport: netTransport,
 }
 
-var (
-	webhookQueue chan *Webhook
-	webhookLog   log.Logger
-)
-
-func initWebhookQueue() {
-	webhookLog = log.New("notifications.webhook")
-	webhookQueue = make(chan *Webhook, 10)
-	go processWebhookQueue()
-}
-
-func processWebhookQueue() {
-	for {
-		select {
-		case webhook := <-webhookQueue:
-			err := sendWebRequestSync(context.Background(), webhook)
-
-			if err != nil {
-				webhookLog.Error("Failed to send webrequest ", "error", err)
-			}
-		}
-	}
-}
-
-func sendWebRequestSync(ctx context.Context, webhook *Webhook) error {
-	webhookLog.Debug("Sending webhook", "url", webhook.Url, "http method", webhook.HttpMethod)
+func (ns *NotificationService) sendWebRequestSync(ctx context.Context, webhook *Webhook) error {
+	ns.log.Debug("Sending webhook", "url", webhook.Url, "http method", webhook.HttpMethod)
 
 	if webhook.HttpMethod == "" {
 		webhook.HttpMethod = http.MethodPost
@@ -98,6 +73,6 @@ func sendWebRequestSync(ctx context.Context, webhook *Webhook) error {
 		return err
 	}
 
-	webhookLog.Debug("Webhook failed", "statuscode", resp.Status, "body", string(body))
+	ns.log.Debug("Webhook failed", "statuscode", resp.Status, "body", string(body))
 	return fmt.Errorf("Webhook response status %v", resp.Status)
 }