Torkel Ödegaard 11 лет назад
Родитель
Сommit
896e6d4662

+ 4 - 1
conf/grafana.ini

@@ -121,4 +121,7 @@ daily_rotate = true
 ; Expired days of log file(delete after max days), default is 7
 max_days = 7
 
-
+[event_publisher]
+enabled = false
+rabbitmq_url = amqp://localhost/
+exchange = grafana_events

+ 30 - 11
pkg/bus/bus.go

@@ -11,13 +11,16 @@ type Msg interface{}
 type Bus interface {
 	Dispatch(msg Msg) error
 	Publish(msg Msg) error
+
 	AddHandler(handler HandlerFunc)
 	AddEventListener(handler HandlerFunc)
+	AddWildcardListener(handler HandlerFunc)
 }
 
 type InProcBus struct {
-	handlers  map[string]HandlerFunc
-	listeners map[string][]HandlerFunc
+	handlers          map[string]HandlerFunc
+	listeners         map[string][]HandlerFunc
+	wildcardListeners []HandlerFunc
 }
 
 // temp stuff, not sure how to handle bus instance, and init yet
@@ -27,6 +30,7 @@ func New() Bus {
 	bus := &InProcBus{}
 	bus.handlers = make(map[string]HandlerFunc)
 	bus.listeners = make(map[string][]HandlerFunc)
+	bus.wildcardListeners = make([]HandlerFunc, 0)
 	return bus
 }
 
@@ -52,16 +56,20 @@ func (b *InProcBus) Dispatch(msg Msg) error {
 
 func (b *InProcBus) Publish(msg Msg) error {
 	var msgName = reflect.TypeOf(msg).Elem().Name()
-
 	var listeners = b.listeners[msgName]
-	if len(listeners) == 0 {
-		return nil
-	}
 
 	var params = make([]reflect.Value, 1)
 	params[0] = reflect.ValueOf(msg)
 
-	for listenerHandler := range listeners {
+	for _, listenerHandler := range listeners {
+		ret := reflect.ValueOf(listenerHandler).Call(params)
+		err := ret[0].Interface()
+		if err != nil {
+			return err.(error)
+		}
+	}
+
+	for _, listenerHandler := range b.wildcardListeners {
 		ret := reflect.ValueOf(listenerHandler).Call(params)
 		err := ret[0].Interface()
 		if err != nil {
@@ -72,6 +80,10 @@ func (b *InProcBus) Publish(msg Msg) error {
 	return nil
 }
 
+func (b *InProcBus) AddWildcardListener(handler HandlerFunc) {
+	b.wildcardListeners = append(b.wildcardListeners, handler)
+}
+
 func (b *InProcBus) AddHandler(handler HandlerFunc) {
 	handlerType := reflect.TypeOf(handler)
 	queryTypeName := handlerType.In(0).Elem().Name()
@@ -81,12 +93,11 @@ func (b *InProcBus) AddHandler(handler HandlerFunc) {
 func (b *InProcBus) AddEventListener(handler HandlerFunc) {
 	handlerType := reflect.TypeOf(handler)
 	eventName := handlerType.In(0).Elem().Name()
-	list, exists := b.listeners[eventName]
+	_, exists := b.listeners[eventName]
 	if !exists {
-		list = make([]HandlerFunc, 0)
-		b.listeners[eventName] = list
+		b.listeners[eventName] = make([]HandlerFunc, 0)
 	}
-	list = append(list, handler)
+	b.listeners[eventName] = append(b.listeners[eventName], handler)
 }
 
 // Package level functions
@@ -99,6 +110,14 @@ func AddEventListener(handler HandlerFunc) {
 	globalBus.AddEventListener(handler)
 }
 
+func AddWildcardListener(handler HandlerFunc) {
+	globalBus.AddWildcardListener(handler)
+}
+
 func Dispatch(msg Msg) error {
 	return globalBus.Dispatch(msg)
 }
+
+func Publish(msg Msg) error {
+	return globalBus.Publish(msg)
+}

+ 3 - 2
pkg/bus/bus_test.go

@@ -2,6 +2,7 @@ package bus
 
 import (
 	"errors"
+	"fmt"
 	"testing"
 )
 
@@ -62,7 +63,7 @@ func TestEventListeners(t *testing.T) {
 
 	if err != nil {
 		t.Fatal("Publish event failed " + err.Error())
-	} else if count != 0 {
-		t.Fatal("Publish event failed, listeners called: %v, expected: %v", count, 11)
+	} else if count != 11 {
+		t.Fatal(fmt.Sprintf("Publish event failed, listeners called: %v, expected: %v", count, 11))
 	}
 }

+ 3 - 1
pkg/cmd/web.go

@@ -16,6 +16,7 @@ import (
 	"github.com/torkelo/grafana-pro/pkg/api"
 	"github.com/torkelo/grafana-pro/pkg/log"
 	"github.com/torkelo/grafana-pro/pkg/middleware"
+	"github.com/torkelo/grafana-pro/pkg/services/eventpublisher"
 	"github.com/torkelo/grafana-pro/pkg/services/sqlstore"
 	"github.com/torkelo/grafana-pro/pkg/setting"
 	"github.com/torkelo/grafana-pro/pkg/social"
@@ -81,11 +82,12 @@ func runWeb(c *cli.Context) {
 	social.NewOAuthService()
 	sqlstore.NewEngine()
 	sqlstore.EnsureAdminUser()
+	eventpublisher.Init()
 
+	var err error
 	m := newMacaron()
 	api.Register(m)
 
-	var err error
 	listenAddr := fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort)
 	log.Info("Listen: %v://%s%s", setting.Protocol, listenAddr, setting.AppSubUrl)
 	switch setting.Protocol {

+ 79 - 0
pkg/events/events.go

@@ -0,0 +1,79 @@
+package events
+
+import (
+	"reflect"
+	"time"
+)
+
+// Events can be passed to external systems via for example AMPQ
+// Treat these events as basically DTOs so changes has to be backward compatible
+
+type Priority string
+
+const (
+	PRIO_DEBUG Priority = "DEBUG"
+	PRIO_INFO  Priority = "INFO"
+	PRIO_ERROR Priority = "ERROR"
+)
+
+type Event struct {
+	Timestamp time.Time `json:"timestamp"`
+}
+
+type OnTheWireEvent struct {
+	EventType string      `json:"event_type"`
+	Priority  Priority    `json:"priority"`
+	Timestamp time.Time   `json:"timestamp"`
+	Payload   interface{} `json:"payload"`
+}
+
+type EventBase interface {
+	ToOnWriteEvent() *OnTheWireEvent
+}
+
+func ToOnWriteEvent(event interface{}) (*OnTheWireEvent, error) {
+	eventType := reflect.TypeOf(event)
+
+	wireEvent := OnTheWireEvent{
+		Priority:  PRIO_INFO,
+		EventType: eventType.Name(),
+		Payload:   event,
+	}
+
+	baseField := reflect.ValueOf(event).FieldByName("Timestamp")
+	if baseField.IsValid() {
+		wireEvent.Timestamp = baseField.Interface().(time.Time)
+	} else {
+		wireEvent.Timestamp = time.Now()
+	}
+
+	return &wireEvent, nil
+}
+
+type AccountCreated struct {
+	Timestamp time.Time `json:"timestamp"`
+	Id        int64     `json:"id"`
+	Name      string    `json:"name"`
+}
+
+type AccountUpdated struct {
+	Timestamp time.Time `json:"timestamp"`
+	Id        int64     `json:"id"`
+	Name      string    `json:"name"`
+}
+
+type UserCreated struct {
+	Timestamp time.Time `json:"timestamp"`
+	Id        int64     `json:"id"`
+	Name      string    `json:"name"`
+	Login     string    `json:"login"`
+	Email     string    `json:"email"`
+}
+
+type UserUpdated struct {
+	Timestamp time.Time `json:"timestamp"`
+	Id        int64     `json:"id"`
+	Name      string    `json:"name"`
+	Login     string    `json:"login"`
+	Email     string    `json:"email"`
+}

+ 30 - 0
pkg/events/events_test.go

@@ -0,0 +1,30 @@
+package events
+
+import (
+	"encoding/json"
+	"testing"
+	"time"
+
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+type TestEvent struct {
+	Timestamp time.Time
+}
+
+func TestEventCreation(t *testing.T) {
+
+	Convey("Event to wire event", t, func() {
+		e := TestEvent{
+			Timestamp: time.Unix(1231421123, 223),
+		}
+
+		wire, _ := ToOnWriteEvent(e)
+		So(e.Timestamp.Unix(), ShouldEqual, wire.Timestamp.Unix())
+		So(wire.EventType, ShouldEqual, "TestEvent")
+
+		json, _ := json.Marshal(wire)
+		So(string(json), ShouldEqual, `{"event_type":"TestEvent","priority":"INFO","timestamp":"2009-01-08T14:25:23.000000223+01:00","payload":{"Timestamp":"2009-01-08T14:25:23.000000223+01:00"}}`)
+	})
+
+}

+ 149 - 0
pkg/services/eventpublisher/eventpublisher.go

@@ -0,0 +1,149 @@
+package eventpublisher
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	"time"
+
+	"github.com/streadway/amqp"
+	"github.com/torkelo/grafana-pro/pkg/bus"
+	"github.com/torkelo/grafana-pro/pkg/events"
+	"github.com/torkelo/grafana-pro/pkg/setting"
+)
+
+var (
+	url      string
+	exchange string
+	conn     *amqp.Connection
+	channel  *amqp.Channel
+)
+
+func getConnection() (*amqp.Connection, error) {
+	c, err := amqp.Dial(url)
+	if err != nil {
+		return nil, err
+	}
+	return c, err
+}
+
+func getChannel() (*amqp.Channel, error) {
+	ch, err := conn.Channel()
+	if err != nil {
+		return nil, err
+	}
+
+	err = ch.ExchangeDeclare(
+		exchange, // name
+		"topic",  // type
+		true,     // durable
+		false,    // auto-deleted
+		false,    // internal
+		false,    // no-wait
+		nil,      // arguments
+	)
+	if err != nil {
+		return nil, err
+	}
+	return ch, err
+}
+
+func Init() {
+	sec := setting.Cfg.Section("event_publisher")
+
+	if !sec.Key("enabled").MustBool(false) {
+		return
+	}
+
+	url = sec.Key("rabbitmq_url").String()
+	exchange = sec.Key("exchange").String()
+	bus.AddWildcardListener(eventListener)
+
+	if err := Setup(); err != nil {
+		log.Fatal(4, "Failed to connect to notification queue: %v", err)
+		return
+	}
+}
+
+// Every connection should declare the topology they expect
+func Setup() error {
+	c, err := getConnection()
+	if err != nil {
+		return err
+	}
+	conn = c
+	ch, err := getChannel()
+	if err != nil {
+		return err
+	}
+
+	channel = ch
+
+	// listen for close events so we can reconnect.
+	errChan := channel.NotifyClose(make(chan *amqp.Error))
+	go func() {
+		for e := range errChan {
+			fmt.Println("connection to rabbitmq lost.")
+			fmt.Println(e)
+			fmt.Println("attempting to create new rabbitmq channel.")
+			ch, err := getChannel()
+			if err == nil {
+				channel = ch
+				break
+			}
+
+			//could not create channel, so lets close the connection
+			// and re-create.
+			_ = conn.Close()
+
+			for err != nil {
+				time.Sleep(2 * time.Second)
+				fmt.Println("attempting to reconnect to rabbitmq.")
+				err = Setup()
+			}
+			fmt.Println("Connected to rabbitmq again.")
+		}
+	}()
+
+	return nil
+}
+
+func publish(routingKey string, msgString []byte) {
+	err := channel.Publish(
+		exchange,   //exchange
+		routingKey, // routing key
+		false,      // mandatory
+		false,      // immediate
+		amqp.Publishing{
+			ContentType: "application/json",
+			Body:        msgString,
+		},
+	)
+	if err != nil {
+		// failures are most likely because the connection was lost.
+		// the connection will be re-established, so just keep
+		// retrying every 2seconds until we successfully publish.
+		time.Sleep(2 * time.Second)
+		fmt.Println("publish failed, retrying.")
+		publish(routingKey, msgString)
+	}
+	return
+}
+
+func eventListener(event interface{}) error {
+	wireEvent, err := events.ToOnWriteEvent(event)
+	if err != nil {
+		return err
+	}
+
+	msgString, err := json.Marshal(wireEvent)
+	if err != nil {
+		return err
+	}
+
+	routingKey := fmt.Sprintf("%s.%s", wireEvent.Priority, wireEvent.EventType)
+	// this is run in a greenthread and we expect that publish will keep
+	// retrying until the message gets sent.
+	go publish(routingKey, msgString)
+	return nil
+}

+ 20 - 7
pkg/services/sqlstore/account.go

@@ -3,9 +3,8 @@ package sqlstore
 import (
 	"time"
 
-	"github.com/go-xorm/xorm"
-
 	"github.com/torkelo/grafana-pro/pkg/bus"
+	"github.com/torkelo/grafana-pro/pkg/events"
 	m "github.com/torkelo/grafana-pro/pkg/models"
 )
 
@@ -48,7 +47,7 @@ func GetAccountByName(query *m.GetAccountByNameQuery) error {
 }
 
 func CreateAccount(cmd *m.CreateAccountCommand) error {
-	return inTransaction(func(sess *xorm.Session) error {
+	return inTransaction2(func(sess *session) error {
 
 		account := m.Account{
 			Name:    cmd.Name,
@@ -60,7 +59,6 @@ func CreateAccount(cmd *m.CreateAccountCommand) error {
 			return err
 		}
 
-		// create inital admin account user
 		user := m.AccountUser{
 			AccountId: account.Id,
 			UserId:    cmd.UserId,
@@ -72,19 +70,34 @@ func CreateAccount(cmd *m.CreateAccountCommand) error {
 		_, err := sess.Insert(&user)
 		cmd.Result = account
 
+		sess.publishAfterCommit(&events.AccountCreated{
+			Timestamp: account.Created,
+			Id:        account.Id,
+			Name:      account.Name,
+		})
+
 		return err
 	})
 }
 
 func UpdateAccount(cmd *m.UpdateAccountCommand) error {
-	return inTransaction(func(sess *xorm.Session) error {
+	return inTransaction2(func(sess *session) error {
 
 		account := m.Account{
 			Name:    cmd.Name,
 			Updated: time.Now(),
 		}
 
-		_, err := sess.Id(cmd.AccountId).Update(&account)
-		return err
+		if _, err := sess.Id(cmd.AccountId).Update(&account); err != nil {
+			return err
+		}
+
+		sess.publishAfterCommit(&events.AccountUpdated{
+			Timestamp: account.Updated,
+			Id:        account.Id,
+			Name:      account.Name,
+		})
+
+		return nil
 	})
 }

+ 71 - 0
pkg/services/sqlstore/shared.go

@@ -0,0 +1,71 @@
+package sqlstore
+
+import (
+	"github.com/go-xorm/xorm"
+	"github.com/torkelo/grafana-pro/pkg/bus"
+	"github.com/torkelo/grafana-pro/pkg/log"
+)
+
+type dbTransactionFunc func(sess *xorm.Session) error
+type dbTransactionFunc2 func(sess *session) error
+
+type session struct {
+	*xorm.Session
+	events []interface{}
+}
+
+func (sess *session) publishAfterCommit(msg interface{}) {
+	sess.events = append(sess.events, msg)
+}
+
+func inTransaction(callback dbTransactionFunc) error {
+	var err error
+
+	sess := x.NewSession()
+	defer sess.Close()
+
+	if err = sess.Begin(); err != nil {
+		return err
+	}
+
+	err = callback(sess)
+
+	if err != nil {
+		sess.Rollback()
+		return err
+	} else if err = sess.Commit(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func inTransaction2(callback dbTransactionFunc2) error {
+	var err error
+
+	sess := session{Session: x.NewSession()}
+
+	defer sess.Close()
+	if err = sess.Begin(); err != nil {
+		return err
+	}
+
+	err = callback(&sess)
+
+	if err != nil {
+		sess.Rollback()
+		return err
+	} else if err = sess.Commit(); err != nil {
+		return err
+	}
+
+	if len(sess.events) > 0 {
+		for _, e := range sess.events {
+			if err = bus.Publish(e); err != nil {
+				log.Error(3, "Failed to publish event after commit", err)
+			}
+		}
+	}
+
+	return nil
+}

+ 3 - 26
pkg/services/sqlstore/sqlstore.go

@@ -43,12 +43,13 @@ func EnsureAdminUser() {
 		cmd.IsAdmin = true
 
 		if err = bus.Dispatch(&cmd); err != nil {
-			log.Fatal(3, "Failed to create default admin user", err)
+			log.Error(3, "Failed to create default admin user", err)
+			return
 		}
 
 		log.Info("Created default admin user: %v", setting.AdminUser)
 	} else if err != nil {
-		log.Fatal(3, "Could not determine if admin user exists: %v", err)
+		log.Error(3, "Could not determine if admin user exists: %v", err)
 	}
 }
 
@@ -149,27 +150,3 @@ func LoadConfig() {
 	DbCfg.SslMode = sec.Key("ssl_mode").String()
 	DbCfg.Path = sec.Key("path").MustString("data/grafana.db")
 }
-
-type dbTransactionFunc func(sess *xorm.Session) error
-
-func inTransaction(callback dbTransactionFunc) error {
-	var err error
-
-	sess := x.NewSession()
-	defer sess.Close()
-
-	if err = sess.Begin(); err != nil {
-		return err
-	}
-
-	err = callback(sess)
-
-	if err != nil {
-		sess.Rollback()
-		return err
-	} else if err = sess.Commit(); err != nil {
-		return err
-	}
-
-	return nil
-}

+ 29 - 7
pkg/services/sqlstore/user.go

@@ -7,6 +7,7 @@ import (
 	"github.com/go-xorm/xorm"
 
 	"github.com/torkelo/grafana-pro/pkg/bus"
+	"github.com/torkelo/grafana-pro/pkg/events"
 	m "github.com/torkelo/grafana-pro/pkg/models"
 	"github.com/torkelo/grafana-pro/pkg/setting"
 	"github.com/torkelo/grafana-pro/pkg/util"
@@ -23,7 +24,7 @@ func init() {
 	bus.AddHandler("sql", GetUserAccounts)
 }
 
-func getAccountIdForNewUser(userEmail string, sess *xorm.Session) (int64, error) {
+func getAccountIdForNewUser(userEmail string, sess *session) (int64, error) {
 	var account m.Account
 
 	if setting.SingleAccountMode {
@@ -51,7 +52,7 @@ func getAccountIdForNewUser(userEmail string, sess *xorm.Session) (int64, error)
 }
 
 func CreateUser(cmd *m.CreateUserCommand) error {
-	return inTransaction(func(sess *xorm.Session) error {
+	return inTransaction2(func(sess *session) error {
 		accountId, err := getAccountIdForNewUser(cmd.Email, sess)
 		if err != nil {
 			return err
@@ -94,10 +95,20 @@ func CreateUser(cmd *m.CreateUserCommand) error {
 			accountUser.Role = m.RoleType(setting.DefaultAccountRole)
 		}
 
-		_, err = sess.Insert(&accountUser)
+		if _, err = sess.Insert(&accountUser); err != nil {
+			return err
+		}
+
+		sess.publishAfterCommit(&events.UserCreated{
+			Timestamp: user.Created,
+			Id:        user.Id,
+			Name:      user.Name,
+			Login:     user.Login,
+			Email:     user.Email,
+		})
 
 		cmd.Result = user
-		return err
+		return nil
 	})
 }
 
@@ -127,7 +138,7 @@ func GetUserByLogin(query *m.GetUserByLoginQuery) error {
 }
 
 func UpdateUser(cmd *m.UpdateUserCommand) error {
-	return inTransaction(func(sess *xorm.Session) error {
+	return inTransaction2(func(sess *session) error {
 
 		user := m.User{
 			Name:    cmd.Name,
@@ -136,8 +147,19 @@ func UpdateUser(cmd *m.UpdateUserCommand) error {
 			Updated: time.Now(),
 		}
 
-		_, err := sess.Id(cmd.UserId).Update(&user)
-		return err
+		if _, err := sess.Id(cmd.UserId).Update(&user); err != nil {
+			return err
+		}
+
+		sess.publishAfterCommit(&events.UserUpdated{
+			Timestamp: user.Created,
+			Id:        user.Id,
+			Name:      user.Name,
+			Login:     user.Login,
+			Email:     user.Email,
+		})
+
+		return nil
 	})
 }