|
@@ -44,6 +44,7 @@ func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Conte
|
|
|
|
|
|
|
|
type InProcBus struct {
|
|
type InProcBus struct {
|
|
|
handlers map[string]HandlerFunc
|
|
handlers map[string]HandlerFunc
|
|
|
|
|
+ handlersWithCtx map[string]HandlerFunc
|
|
|
listeners map[string][]HandlerFunc
|
|
listeners map[string][]HandlerFunc
|
|
|
wildcardListeners []HandlerFunc
|
|
wildcardListeners []HandlerFunc
|
|
|
txMng TransactionManager
|
|
txMng TransactionManager
|
|
@@ -55,6 +56,7 @@ var globalBus = New()
|
|
|
func New() Bus {
|
|
func New() Bus {
|
|
|
bus := &InProcBus{}
|
|
bus := &InProcBus{}
|
|
|
bus.handlers = make(map[string]HandlerFunc)
|
|
bus.handlers = make(map[string]HandlerFunc)
|
|
|
|
|
+ bus.handlersWithCtx = make(map[string]HandlerFunc)
|
|
|
bus.listeners = make(map[string][]HandlerFunc)
|
|
bus.listeners = make(map[string][]HandlerFunc)
|
|
|
bus.wildcardListeners = make([]HandlerFunc, 0)
|
|
bus.wildcardListeners = make([]HandlerFunc, 0)
|
|
|
bus.txMng = &noopTransactionManager{}
|
|
bus.txMng = &noopTransactionManager{}
|
|
@@ -74,14 +76,26 @@ func (b *InProcBus) SetTransactionManager(tm TransactionManager) {
|
|
|
func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error {
|
|
func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error {
|
|
|
var msgName = reflect.TypeOf(msg).Elem().Name()
|
|
var msgName = reflect.TypeOf(msg).Elem().Name()
|
|
|
|
|
|
|
|
- var handler = b.handlers[msgName]
|
|
|
|
|
|
|
+ // we prefer to use the handler that support context.Context
|
|
|
|
|
+ var handler = b.handlersWithCtx[msgName]
|
|
|
|
|
+ var withCtx = true
|
|
|
|
|
+
|
|
|
|
|
+ // fallback to use classic handlers
|
|
|
|
|
+ if handler == nil {
|
|
|
|
|
+ withCtx = false
|
|
|
|
|
+ handler = b.handlers[msgName]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
if handler == nil {
|
|
if handler == nil {
|
|
|
return ErrHandlerNotFound
|
|
return ErrHandlerNotFound
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- var params = make([]reflect.Value, 2)
|
|
|
|
|
- params[0] = reflect.ValueOf(ctx)
|
|
|
|
|
- params[1] = reflect.ValueOf(msg)
|
|
|
|
|
|
|
+ var params = []reflect.Value{}
|
|
|
|
|
+ if withCtx {
|
|
|
|
|
+ params = append(params, reflect.ValueOf(ctx))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ params = append(params, reflect.ValueOf(msg))
|
|
|
|
|
|
|
|
ret := reflect.ValueOf(handler).Call(params)
|
|
ret := reflect.ValueOf(handler).Call(params)
|
|
|
err := ret[0].Interface()
|
|
err := ret[0].Interface()
|
|
@@ -149,7 +163,7 @@ func (b *InProcBus) AddHandler(handler HandlerFunc) {
|
|
|
func (b *InProcBus) AddHandlerCtx(handler HandlerFunc) {
|
|
func (b *InProcBus) AddHandlerCtx(handler HandlerFunc) {
|
|
|
handlerType := reflect.TypeOf(handler)
|
|
handlerType := reflect.TypeOf(handler)
|
|
|
queryTypeName := handlerType.In(1).Elem().Name()
|
|
queryTypeName := handlerType.In(1).Elem().Name()
|
|
|
- b.handlers[queryTypeName] = handler
|
|
|
|
|
|
|
+ b.handlersWithCtx[queryTypeName] = handler
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
|
|
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
|