bus.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package bus
  2. import (
  3. "fmt"
  4. "reflect"
  5. )
  6. type HandlerFunc interface{}
  7. type Msg interface{}
  8. type Bus interface {
  9. Dispatch(msg Msg) error
  10. Publish(msg Msg) error
  11. AddHandler(handler HandlerFunc)
  12. AddEventListener(handler HandlerFunc)
  13. AddWildcardListener(handler HandlerFunc)
  14. }
  15. type InProcBus struct {
  16. handlers map[string]HandlerFunc
  17. listeners map[string][]HandlerFunc
  18. wildcardListeners []HandlerFunc
  19. }
  20. // temp stuff, not sure how to handle bus instance, and init yet
  21. var globalBus = New()
  22. func New() Bus {
  23. bus := &InProcBus{}
  24. bus.handlers = make(map[string]HandlerFunc)
  25. bus.listeners = make(map[string][]HandlerFunc)
  26. bus.wildcardListeners = make([]HandlerFunc, 0)
  27. return bus
  28. }
  29. func (b *InProcBus) Dispatch(msg Msg) error {
  30. var msgName = reflect.TypeOf(msg).Elem().Name()
  31. var handler = b.handlers[msgName]
  32. if handler == nil {
  33. return fmt.Errorf("handler not found for %s", msgName)
  34. }
  35. var params = make([]reflect.Value, 1)
  36. params[0] = reflect.ValueOf(msg)
  37. ret := reflect.ValueOf(handler).Call(params)
  38. err := ret[0].Interface()
  39. if err == nil {
  40. return nil
  41. } else {
  42. return err.(error)
  43. }
  44. }
  45. func (b *InProcBus) Publish(msg Msg) error {
  46. var msgName = reflect.TypeOf(msg).Elem().Name()
  47. var listeners = b.listeners[msgName]
  48. var params = make([]reflect.Value, 1)
  49. params[0] = reflect.ValueOf(msg)
  50. for _, listenerHandler := range listeners {
  51. ret := reflect.ValueOf(listenerHandler).Call(params)
  52. err := ret[0].Interface()
  53. if err != nil {
  54. return err.(error)
  55. }
  56. }
  57. for _, listenerHandler := range b.wildcardListeners {
  58. ret := reflect.ValueOf(listenerHandler).Call(params)
  59. err := ret[0].Interface()
  60. if err != nil {
  61. return err.(error)
  62. }
  63. }
  64. return nil
  65. }
  66. func (b *InProcBus) AddWildcardListener(handler HandlerFunc) {
  67. b.wildcardListeners = append(b.wildcardListeners, handler)
  68. }
  69. func (b *InProcBus) AddHandler(handler HandlerFunc) {
  70. handlerType := reflect.TypeOf(handler)
  71. queryTypeName := handlerType.In(0).Elem().Name()
  72. b.handlers[queryTypeName] = handler
  73. }
  74. func (b *InProcBus) AddEventListener(handler HandlerFunc) {
  75. handlerType := reflect.TypeOf(handler)
  76. eventName := handlerType.In(0).Elem().Name()
  77. _, exists := b.listeners[eventName]
  78. if !exists {
  79. b.listeners[eventName] = make([]HandlerFunc, 0)
  80. }
  81. b.listeners[eventName] = append(b.listeners[eventName], handler)
  82. }
  83. // Package level functions
  84. func AddHandler(implName string, handler HandlerFunc) {
  85. globalBus.AddHandler(handler)
  86. }
  87. // Package level functions
  88. func AddEventListener(handler HandlerFunc) {
  89. globalBus.AddEventListener(handler)
  90. }
  91. func AddWildcardListener(handler HandlerFunc) {
  92. globalBus.AddWildcardListener(handler)
  93. }
  94. func Dispatch(msg Msg) error {
  95. return globalBus.Dispatch(msg)
  96. }
  97. func Publish(msg Msg) error {
  98. return globalBus.Publish(msg)
  99. }
  100. func ClearBusHandlers() {
  101. globalBus = New()
  102. }