ソースを参照

Added on wire event format

Torkel Ödegaard 11 年 前
コミット
525179eb85

+ 56 - 1
pkg/events/events.go

@@ -1,8 +1,63 @@
 package events
 
+import (
+	"reflect"
+	"time"
+)
+
 // Events can be passed to external systems via for example AMPQ
 // Treat these events as basically DTOs so changes has to be backward compatible
 
+type Priority string
+
+const (
+	PRIO_DEBUG Priority = "DEBUG"
+	PRIO_INFO  Priority = "INFO"
+	PRIO_ERROR Priority = "ERROR"
+)
+
+type Event struct {
+	Timestamp time.Time `json:"timestamp"`
+}
+
+type OnTheWireEvent struct {
+	EventType string      `json:"event_type"`
+	Priority  Priority    `json:"priority"`
+	Timestamp time.Time   `json:"timestamp"`
+	Payload   interface{} `json:"payload"`
+}
+
+type EventBase interface {
+	ToOnWriteEvent() *OnTheWireEvent
+}
+
+func ToOnWriteEvent(event interface{}) (*OnTheWireEvent, error) {
+	eventType := reflect.TypeOf(event)
+
+	wireEvent := OnTheWireEvent{
+		Priority:  PRIO_INFO,
+		EventType: eventType.Name(),
+		Payload:   event,
+	}
+
+	baseField := reflect.ValueOf(event).FieldByName("Timestamp")
+	if baseField.IsValid() {
+		wireEvent.Timestamp = baseField.Interface().(time.Time)
+	} else {
+		wireEvent.Timestamp = time.Now()
+	}
+
+	return &wireEvent, nil
+}
+
 type AccountCreated struct {
-	Name string `json:"name"`
+	Timestamp time.Time `json:"timestamp"`
+	Id        int64     `json:"id"`
+	Name      string    `json:"name"`
+}
+
+type AccountUpdated struct {
+	Timestamp time.Time `json:"timestamp"`
+	Id        int64     `json:"id"`
+	Name      string    `json:"name"`
 }

+ 16 - 4
pkg/events/events_test.go

@@ -1,18 +1,30 @@
 package events
 
 import (
+	"encoding/json"
 	"testing"
+	"time"
 
 	. "github.com/smartystreets/goconvey/convey"
 )
 
+type TestEvent struct {
+	Timestamp time.Time
+}
+
 func TestEventCreation(t *testing.T) {
 
-	Convey("When generating slug", t, func() {
-		dashboard := NewDashboard("Grafana Play Home")
-		dashboard.UpdateSlug()
+	Convey("Event to wire event", t, func() {
+		e := TestEvent{
+			Timestamp: time.Unix(1231421123, 223),
+		}
+
+		wire, _ := ToOnWriteEvent(e)
+		So(e.Timestamp.Unix(), ShouldEqual, wire.Timestamp.Unix())
+		So(wire.EventType, ShouldEqual, "TestEvent")
 
-		So(dashboard.Slug, ShouldEqual, "grafana-play-home")
+		json, _ := json.Marshal(wire)
+		So(string(json), ShouldEqual, `{"event_type":"TestEvent","priority":"INFO","timestamp":"2009-01-08T14:25:23.000000223+01:00","payload":{"Timestamp":"2009-01-08T14:25:23.000000223+01:00"}}`)
 	})
 
 }

+ 7 - 4
pkg/services/eventpublisher/eventpublisher.go

@@ -4,11 +4,11 @@ import (
 	"encoding/json"
 	"fmt"
 	"log"
-	"reflect"
 	"time"
 
 	"github.com/streadway/amqp"
 	"github.com/torkelo/grafana-pro/pkg/bus"
+	"github.com/torkelo/grafana-pro/pkg/events"
 	"github.com/torkelo/grafana-pro/pkg/setting"
 )
 
@@ -131,14 +131,17 @@ func publish(routingKey string, msgString []byte) {
 }
 
 func eventListener(event interface{}) error {
-	msgString, err := json.Marshal(event)
+	wireEvent, err := events.ToOnWriteEvent(event)
 	if err != nil {
 		return err
 	}
 
-	eventType := reflect.TypeOf(event)
+	msgString, err := json.Marshal(wireEvent)
+	if err != nil {
+		return err
+	}
 
-	routingKey := fmt.Sprintf("%s.%s", "INFO", eventType.Name())
+	routingKey := fmt.Sprintf("%s.%s", wireEvent.Priority, wireEvent.EventType)
 	// this is run in a greenthread and we expect that publish will keep
 	// retrying until the message gets sent.
 	go publish(routingKey, msgString)

+ 13 - 22
pkg/services/sqlstore/account.go

@@ -3,8 +3,6 @@ package sqlstore
 import (
 	"time"
 
-	"github.com/go-xorm/xorm"
-
 	"github.com/torkelo/grafana-pro/pkg/bus"
 	"github.com/torkelo/grafana-pro/pkg/events"
 	m "github.com/torkelo/grafana-pro/pkg/models"
@@ -73,15 +71,9 @@ func CreateAccount(cmd *m.CreateAccountCommand) error {
 		cmd.Result = account
 
 		sess.publishAfterCommit(&events.AccountCreated{
-			Name: account.Name,
-		})
-
-		// silently ignore failures to publish events.
-		_ = bus.Publish(&m.Notification{
-			EventType: "account.create",
 			Timestamp: account.Created,
-			Priority:  m.PRIO_INFO,
-			Payload:   account,
+			Id:        account.Id,
+			Name:      account.Name,
 		})
 
 		return err
@@ -89,24 +81,23 @@ func CreateAccount(cmd *m.CreateAccountCommand) error {
 }
 
 func UpdateAccount(cmd *m.UpdateAccountCommand) error {
-	return inTransaction(func(sess *xorm.Session) error {
+	return inTransaction2(func(sess *session) error {
 
 		account := m.Account{
 			Name:    cmd.Name,
 			Updated: time.Now(),
 		}
 
-		_, err := sess.Id(cmd.AccountId).Update(&account)
-		if err == nil {
-			// silently ignore failures to publish events.
-			account.Id = cmd.AccountId
-			_ = bus.Publish(&m.Notification{
-				EventType: "account.update",
-				Timestamp: account.Updated,
-				Priority:  m.PRIO_INFO,
-				Payload:   account,
-			})
+		if _, err := sess.Id(cmd.AccountId).Update(&account); err != nil {
+			return err
 		}
-		return err
+
+		sess.publishAfterCommit(events.AccountUpdated{
+			Timestamp: account.Updated,
+			Id:        account.Id,
+			Name:      account.Name,
+		})
+
+		return nil
 	})
 }

+ 0 - 11
pkg/setting/setting.go

@@ -219,17 +219,6 @@ func NewConfigContext() {
 
 	LogRootPath = Cfg.Section("log").Key("root_path").MustString(path.Join(WorkDir, "/data/log"))
 
-	// Notifications
-	NotificationsEnabled = Cfg.Section("notifications").Key("enabled").MustBool(false)
-	RabbitmqUrl = Cfg.Section("notifications").Key("rabbitmq_url").MustString("amqp://localhost/")
-
-	// validate rabbitmqUrl.
-	_, err = url.Parse(RabbitmqUrl)
-	if err != nil {
-		log.Fatal(4, "Invalid rabbitmq_url(%s): %s", RabbitmqUrl, err)
-	}
-	NotificationsExchange = Cfg.Section("notifications").Key("notifications_exchange").MustString("notifications")
-
 	readSessionConfig()
 }