bus.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package bus
  2. import (
  3. "context"
  4. "errors"
  5. "reflect"
  6. )
  7. // HandlerFunc defines a handler function interface.
  8. type HandlerFunc interface{}
  9. // CtxHandlerFunc defines a context handler function.
  10. type CtxHandlerFunc func()
  11. // Msg defines a message interface.
  12. type Msg interface{}
  13. // ErrHandlerNotFound defines an error if a handler is not found
  14. var ErrHandlerNotFound = errors.New("handler not found")
  15. // TransactionManager defines a transaction interface
  16. type TransactionManager interface {
  17. InTransaction(ctx context.Context, fn func(ctx context.Context) error) error
  18. }
  19. // Bus type defines the bus interface structure
  20. type Bus interface {
  21. Dispatch(msg Msg) error
  22. DispatchCtx(ctx context.Context, msg Msg) error
  23. Publish(msg Msg) error
  24. // InTransaction starts a transaction and store it in the context.
  25. // The caller can then pass a function with multiple DispatchCtx calls that
  26. // all will be executed in the same transaction. InTransaction will rollback if the
  27. // callback returns an error.
  28. InTransaction(ctx context.Context, fn func(ctx context.Context) error) error
  29. AddHandler(handler HandlerFunc)
  30. AddHandlerCtx(handler HandlerFunc)
  31. AddEventListener(handler HandlerFunc)
  32. AddWildcardListener(handler HandlerFunc)
  33. // SetTransactionManager allows the user to replace the internal
  34. // noop TransactionManager that is responsible for manageing
  35. // transactions in `InTransaction`
  36. SetTransactionManager(tm TransactionManager)
  37. }
  38. // InTransaction defines an in transaction function
  39. func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
  40. return b.txMng.InTransaction(ctx, fn)
  41. }
  42. // InProcBus defines the bus structure
  43. type InProcBus struct {
  44. handlers map[string]HandlerFunc
  45. handlersWithCtx map[string]HandlerFunc
  46. listeners map[string][]HandlerFunc
  47. wildcardListeners []HandlerFunc
  48. txMng TransactionManager
  49. }
  50. // temp stuff, not sure how to handle bus instance, and init yet
  51. var globalBus = New()
  52. // New initialize the bus
  53. func New() Bus {
  54. bus := &InProcBus{}
  55. bus.handlers = make(map[string]HandlerFunc)
  56. bus.handlersWithCtx = make(map[string]HandlerFunc)
  57. bus.listeners = make(map[string][]HandlerFunc)
  58. bus.wildcardListeners = make([]HandlerFunc, 0)
  59. bus.txMng = &noopTransactionManager{}
  60. return bus
  61. }
  62. // Want to get rid of global bus
  63. func GetBus() Bus {
  64. return globalBus
  65. }
  66. // SetTransactionManager function assign a transaction manager to the bus.
  67. func (b *InProcBus) SetTransactionManager(tm TransactionManager) {
  68. b.txMng = tm
  69. }
  70. // DispatchCtx function dispatch a message to the bus context.
  71. func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error {
  72. var msgName = reflect.TypeOf(msg).Elem().Name()
  73. var handler = b.handlersWithCtx[msgName]
  74. if handler == nil {
  75. return ErrHandlerNotFound
  76. }
  77. var params = []reflect.Value{}
  78. params = append(params, reflect.ValueOf(ctx))
  79. params = append(params, reflect.ValueOf(msg))
  80. ret := reflect.ValueOf(handler).Call(params)
  81. err := ret[0].Interface()
  82. if err == nil {
  83. return nil
  84. }
  85. return err.(error)
  86. }
  87. // Dispatch function dispatch a message to the bus.
  88. func (b *InProcBus) Dispatch(msg Msg) error {
  89. var msgName = reflect.TypeOf(msg).Elem().Name()
  90. var handler = b.handlersWithCtx[msgName]
  91. withCtx := true
  92. if handler == nil {
  93. withCtx = false
  94. handler = b.handlers[msgName]
  95. }
  96. if handler == nil {
  97. return ErrHandlerNotFound
  98. }
  99. var params = []reflect.Value{}
  100. if withCtx {
  101. params = append(params, reflect.ValueOf(context.Background()))
  102. }
  103. params = append(params, reflect.ValueOf(msg))
  104. ret := reflect.ValueOf(handler).Call(params)
  105. err := ret[0].Interface()
  106. if err == nil {
  107. return nil
  108. }
  109. return err.(error)
  110. }
  111. // Publish function publish a message to the bus listener.
  112. func (b *InProcBus) Publish(msg Msg) error {
  113. var msgName = reflect.TypeOf(msg).Elem().Name()
  114. var listeners = b.listeners[msgName]
  115. var params = make([]reflect.Value, 1)
  116. params[0] = reflect.ValueOf(msg)
  117. for _, listenerHandler := range listeners {
  118. ret := reflect.ValueOf(listenerHandler).Call(params)
  119. err := ret[0].Interface()
  120. if err != nil {
  121. return err.(error)
  122. }
  123. }
  124. for _, listenerHandler := range b.wildcardListeners {
  125. ret := reflect.ValueOf(listenerHandler).Call(params)
  126. err := ret[0].Interface()
  127. if err != nil {
  128. return err.(error)
  129. }
  130. }
  131. return nil
  132. }
  133. func (b *InProcBus) AddWildcardListener(handler HandlerFunc) {
  134. b.wildcardListeners = append(b.wildcardListeners, handler)
  135. }
  136. func (b *InProcBus) AddHandler(handler HandlerFunc) {
  137. handlerType := reflect.TypeOf(handler)
  138. queryTypeName := handlerType.In(0).Elem().Name()
  139. b.handlers[queryTypeName] = handler
  140. }
  141. func (b *InProcBus) AddHandlerCtx(handler HandlerFunc) {
  142. handlerType := reflect.TypeOf(handler)
  143. queryTypeName := handlerType.In(1).Elem().Name()
  144. b.handlersWithCtx[queryTypeName] = handler
  145. }
  146. func (b *InProcBus) AddEventListener(handler HandlerFunc) {
  147. handlerType := reflect.TypeOf(handler)
  148. eventName := handlerType.In(0).Elem().Name()
  149. _, exists := b.listeners[eventName]
  150. if !exists {
  151. b.listeners[eventName] = make([]HandlerFunc, 0)
  152. }
  153. b.listeners[eventName] = append(b.listeners[eventName], handler)
  154. }
  155. // AddHandler attach a handler function to the global bus
  156. // Package level function
  157. func AddHandler(implName string, handler HandlerFunc) {
  158. globalBus.AddHandler(handler)
  159. }
  160. // AddHandlerCtx attach a handler function to the global bus context
  161. // Package level functions
  162. func AddHandlerCtx(implName string, handler HandlerFunc) {
  163. globalBus.AddHandlerCtx(handler)
  164. }
  165. // AddEventListener attach a handler function to the event listener
  166. // Package level functions
  167. func AddEventListener(handler HandlerFunc) {
  168. globalBus.AddEventListener(handler)
  169. }
  170. // AddWildcardListener attach a handler function to the wildcard listener
  171. func AddWildcardListener(handler HandlerFunc) {
  172. globalBus.AddWildcardListener(handler)
  173. }
  174. func Dispatch(msg Msg) error {
  175. return globalBus.Dispatch(msg)
  176. }
  177. func DispatchCtx(ctx context.Context, msg Msg) error {
  178. return globalBus.DispatchCtx(ctx, msg)
  179. }
  180. func Publish(msg Msg) error {
  181. return globalBus.Publish(msg)
  182. }
  183. // InTransaction starts a transaction and store it in the context.
  184. // The caller can then pass a function with multiple DispatchCtx calls that
  185. // all will be executed in the same transaction. InTransaction will rollback if the
  186. // callback returns an error.
  187. func InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
  188. return globalBus.InTransaction(ctx, fn)
  189. }
  190. func ClearBusHandlers() {
  191. globalBus = New()
  192. }
  193. type noopTransactionManager struct{}
  194. func (*noopTransactionManager) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
  195. return fn(ctx)
  196. }