浏览代码

Add inital implementation of Notification events.

If notifications are enabled in the config, Adds a eventHandler
accepting Notification{} payloads to the internal Bus.  The
eventHandler then marshals the payload into json and sends it
to a rabbitmq topic exchange using the
Notification.Priority+Noticiation.EventType as the routing key.
eg.  INFO.account.created

Currently, notifications are only being emitted for
INFO.account.created
INFO.account.updated
INFO.user.created
INFO.user.updated
woodsaj 11 年之前
父节点
当前提交
a712f1a231

+ 4 - 1
conf/grafana.ini

@@ -121,4 +121,7 @@ daily_rotate = true
 ; Expired days of log file(delete after max days), default is 7
 max_days = 7
 
-
+[notifications]
+enabled = false
+rabbitmq_url = amqp://localhost/
+notifications_exchange = notifications

+ 1 - 1
grafana

@@ -1 +1 @@
-Subproject commit 7fef460fa2b6034429b205dc63d9b9f298f43fb4
+Subproject commit 0fe83d51981333600f1e3801044fc1cfd5acf1ae

+ 8 - 6
pkg/bus/bus.go

@@ -52,7 +52,6 @@ func (b *InProcBus) Dispatch(msg Msg) error {
 
 func (b *InProcBus) Publish(msg Msg) error {
 	var msgName = reflect.TypeOf(msg).Elem().Name()
-
 	var listeners = b.listeners[msgName]
 	if len(listeners) == 0 {
 		return nil
@@ -61,7 +60,7 @@ func (b *InProcBus) Publish(msg Msg) error {
 	var params = make([]reflect.Value, 1)
 	params[0] = reflect.ValueOf(msg)
 
-	for listenerHandler := range listeners {
+	for _, listenerHandler := range listeners {
 		ret := reflect.ValueOf(listenerHandler).Call(params)
 		err := ret[0].Interface()
 		if err != nil {
@@ -81,12 +80,11 @@ func (b *InProcBus) AddHandler(handler HandlerFunc) {
 func (b *InProcBus) AddEventListener(handler HandlerFunc) {
 	handlerType := reflect.TypeOf(handler)
 	eventName := handlerType.In(0).Elem().Name()
-	list, exists := b.listeners[eventName]
+	_, exists := b.listeners[eventName]
 	if !exists {
-		list = make([]HandlerFunc, 0)
-		b.listeners[eventName] = list
+		b.listeners[eventName] = make([]HandlerFunc, 0)
 	}
-	list = append(list, handler)
+	b.listeners[eventName] = append(b.listeners[eventName], handler)
 }
 
 // Package level functions
@@ -102,3 +100,7 @@ func AddEventListener(handler HandlerFunc) {
 func Dispatch(msg Msg) error {
 	return globalBus.Dispatch(msg)
 }
+
+func Publish(msg Msg) error {
+	return globalBus.Publish(msg)
+}

+ 8 - 1
pkg/cmd/web.go

@@ -19,6 +19,7 @@ import (
 	"github.com/torkelo/grafana-pro/pkg/services/sqlstore"
 	"github.com/torkelo/grafana-pro/pkg/setting"
 	"github.com/torkelo/grafana-pro/pkg/social"
+	"github.com/torkelo/grafana-pro/pkg/services/notification"
 )
 
 var CmdWeb = cli.Command{
@@ -81,11 +82,17 @@ func runWeb(c *cli.Context) {
 	social.NewOAuthService()
 	sqlstore.NewEngine()
 	sqlstore.EnsureAdminUser()
+	var err error
+	if setting.NotificationsEnabled {
+		err = notification.Init(setting.RabbitmqUrl, setting.NotificationsExchange)
+		if err != nil {
+			log.Fatal(4, "Failed to connect to notification queue: %v", err)
+		}
+	}
 
 	m := newMacaron()
 	api.Register(m)
 
-	var err error
 	listenAddr := fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort)
 	log.Info("Listen: %v://%s%s", setting.Protocol, listenAddr, setting.AppSubUrl)
 	switch setting.Protocol {

+ 21 - 0
pkg/models/notification.go

@@ -0,0 +1,21 @@
+package models
+
+import (
+	"time"
+)
+
+type EventPriority string
+
+const (
+	PRIO_DEBUG EventPriority = "DEBUG"
+	PRIO_INFO EventPriority = "INFO"
+	PRIO_ERROR EventPriority = "ERROR"
+)
+
+type Notification struct {
+	EventType string `json:"event_type"`
+	Timestamp time.Time `json:"timestamp"`
+	Priority  EventPriority `json:"priority"`
+	Payload interface{} `json:"payload"`
+}
+

+ 130 - 0
pkg/services/notification/notification.go

@@ -0,0 +1,130 @@
+package notification
+
+import (
+	"fmt"
+	"time"
+	"encoding/json"
+	"github.com/streadway/amqp"
+	"github.com/torkelo/grafana-pro/pkg/bus"
+	m "github.com/torkelo/grafana-pro/pkg/models"
+)
+
+var (
+	url string
+	exchange string
+	conn *amqp.Connection
+	channel *amqp.Channel
+)
+
+func getConnection() (*amqp.Connection, error) {
+	c, err := amqp.Dial(url)
+	if err != nil {
+		return nil,  err
+	}
+	return c, err
+}
+
+func getChannel() (*amqp.Channel, error) {
+	ch, err := conn.Channel()
+	if err != nil {
+		return nil, err
+	}
+
+	err = ch.ExchangeDeclare(
+		exchange,     // name
+		"topic",      // type
+		true,         // durable
+		false,        // auto-deleted
+		false,        // internal
+		false,        // no-wait
+		nil,          // arguments
+	)
+	if (err != nil) {
+		return nil, err
+	}
+	return ch, err
+}
+
+func Init(rabbitUrl string, exchangeName string) error {
+	url = rabbitUrl
+	exchange = exchangeName
+	bus.AddEventListener(NotificationHandler)
+	return Setup()
+}
+
+// Every connection should declare the topology they expect
+func Setup() error {
+	c, err := getConnection()
+	if err != nil {
+		return err
+	}
+	conn = c
+	ch, err := getChannel()
+	if err != nil {
+		return err
+	}
+
+	channel = ch
+
+	// listen for close events so we can reconnect.
+	errChan := channel.NotifyClose(make(chan *amqp.Error))
+	go func() {
+		for e := range errChan {
+			fmt.Println("connection to rabbitmq lost.")
+			fmt.Println(e)
+			fmt.Println("attempting to create new rabbitmq channel.")
+			ch, err := getChannel()
+			if err == nil {
+				channel = ch
+				break
+			}
+
+			//could not create channel, so lets close the connection
+			// and re-create.
+			_ = conn.Close()
+			
+			for err != nil {
+				time.Sleep(2 * time.Second)
+				fmt.Println("attempting to reconnect to rabbitmq.")
+				err = Setup()
+			}
+			fmt.Println("Connected to rabbitmq again.")
+		}
+	}()
+
+    return nil
+}
+
+func Publish(routingKey string, msgString []byte) {
+	err := channel.Publish(
+		exchange,      //exchange
+		routingKey,   // routing key
+		false,       // mandatory
+		false,      // immediate
+		amqp.Publishing{
+			ContentType: "application/json",
+			Body: msgString,
+		},
+	)
+	if err != nil {
+		// failures are most likely because the connection was lost.
+		// the connection will be re-established, so just keep 
+		// retrying every 2seconds until we successfully publish.
+		time.Sleep(2 * time.Second)
+		fmt.Println("publish failed, retrying.");
+		Publish(routingKey, msgString)
+	}
+	return
+}
+
+func NotificationHandler(event *m.Notification) error {
+	msgString, err := json.Marshal(event)
+	if err != nil {
+		return err
+	}
+	routingKey := fmt.Sprintf("%s.%s", event.Priority, event.EventType)
+	// this is run in a greenthread and we expect that publish will keep
+	// retrying until the message gets sent.
+	go Publish(routingKey, msgString)
+	return nil
+}

+ 18 - 0
pkg/services/sqlstore/account.go

@@ -72,6 +72,14 @@ func CreateAccount(cmd *m.CreateAccountCommand) error {
 		_, err := sess.Insert(&user)
 		cmd.Result = account
 
+		// silently ignore failures to publish events.
+		_ = bus.Publish(&m.Notification{
+			EventType: "account.create",
+			Timestamp: account.Created,
+			Priority:  m.PRIO_INFO,
+			Payload:   account,
+		})
+		 
 		return err
 	})
 }
@@ -85,6 +93,16 @@ func UpdateAccount(cmd *m.UpdateAccountCommand) error {
 		}
 
 		_, err := sess.Id(cmd.AccountId).Update(&account)
+		if err == nil {
+			// silently ignore failures to publish events.
+			account.Id = cmd.AccountId
+			_ = bus.Publish(&m.Notification{
+				EventType: "account.update",
+				Timestamp: account.Updated,
+				Priority:  m.PRIO_INFO,
+				Payload:   account,
+			})
+		}
 		return err
 	})
 }

+ 16 - 0
pkg/services/sqlstore/user.go

@@ -95,6 +95,12 @@ func CreateUser(cmd *m.CreateUserCommand) error {
 		_, err = sess.Insert(&accountUser)
 
 		cmd.Result = user
+		_ = bus.Publish(&m.Notification{
+			EventType: "user.create",
+			Timestamp: user.Created,
+			Priority:  m.PRIO_INFO,
+			Payload:   user,
+		})
 		return err
 	})
 }
@@ -135,6 +141,16 @@ func UpdateUser(cmd *m.UpdateUserCommand) error {
 		}
 
 		_, err := sess.Id(cmd.UserId).Update(&user)
+		if err == nil {
+			// silently ignore failures to publish events.
+			user.Id = cmd.UserId
+			_ = bus.Publish(&m.Notification{
+				EventType: "user.update",
+				Timestamp: user.Updated,
+				Priority:  m.PRIO_INFO,
+				Payload:   user,
+			})
+		}
 		return err
 	})
 }

+ 17 - 0
pkg/setting/setting.go

@@ -93,6 +93,12 @@ var (
 	// PhantomJs Rendering
 	ImagesDir  string
 	PhantomDir string
+
+	//Notifications
+	NotificationsEnabled bool
+	RabbitmqUrl string
+	NotificationsExchange string
+
 )
 
 func init() {
@@ -219,6 +225,17 @@ func NewConfigContext() {
 
 	LogRootPath = Cfg.Section("log").Key("root_path").MustString(path.Join(WorkDir, "/data/log"))
 
+	// Notifications
+	NotificationsEnabled = Cfg.Section("notifications").Key("enabled").MustBool(false)
+	RabbitmqUrl = Cfg.Section("notifications").Key("rabbitmq_url").MustString("amqp://localhost/")
+	
+	// validate rabbitmqUrl.
+	_, err = url.Parse(RabbitmqUrl)
+	if err != nil {
+		log.Fatal(4, "Invalid rabbitmq_url(%s): %s", RabbitmqUrl, err)
+	}
+	NotificationsExchange = Cfg.Section("notifications").Key("notifications_exchange").MustString("notifications")
+
 	readSessionConfig()
 }