From 6367896096cd7f18be17bd736e34694d9135cf79 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 29 Apr 2019 00:18:42 +0200 Subject: [PATCH] More flexible CQRS config and PoisonQueueWithFilter middleware (#67) --- UPGRADE-0.4.md | 23 +++ components/cqrs/command_bus.go | 27 ++- components/cqrs/command_bus_test.go | 55 +++-- components/cqrs/command_processor.go | 92 ++++++--- components/cqrs/command_processor_test.go | 35 ++-- components/cqrs/cqrs.go | 130 ++++++++---- components/cqrs/cqrs_test.go | 76 ++++++- components/cqrs/event_bus.go | 26 ++- components/cqrs/event_bus_test.go | 28 ++- components/cqrs/event_processor.go | 84 +++++--- components/cqrs/event_processor_test.go | 34 ++- components/cqrs/object.go | 2 + message/infrastructure/gochannel/pubsub.go | 29 ++- message/message.go | 2 + message/router/middleware/correlation.go | 34 +-- message/router/middleware/correlation_test.go | 11 + message/router/middleware/poison.go | 87 +++++--- message/router/middleware/poison_test.go | 194 +++++++++++++----- 18 files changed, 692 insertions(+), 277 deletions(-) create mode 100644 UPGRADE-0.4.md diff --git a/UPGRADE-0.4.md b/UPGRADE-0.4.md new file mode 100644 index 000000000..6d8a02823 --- /dev/null +++ b/UPGRADE-0.4.md @@ -0,0 +1,23 @@ +# UPGRADE FROM 0.3.x to 0.4 + +# `watermill/components/cqrs` + +### `CommandHandler.HandlerName` and `EventHandler.HandlerName` was added to the interface. + +If you are using metrics component, you may want to keep backward capability with handler names. In other case you can implement your own method of generating handler name. + +Keeping backward capability for **event handlers**: + +``` +func (h CommandHandler) HandlerName() string { + return fmt.Sprintf("command_processor-%s", h) +} +``` + +Keeping backward capability for **command handlers**: + +``` +func (h EventHandler) HandlerName() string { + return fmt.Sprintf("event_processor-%s", ObjectName(h)) +} +``` diff --git a/components/cqrs/command_bus.go b/components/cqrs/command_bus.go index 8de7123a7..8c4d6e7ea 100644 --- a/components/cqrs/command_bus.go +++ b/components/cqrs/command_bus.go @@ -3,32 +3,34 @@ package cqrs import ( "context" + "github.com/pkg/errors" + "github.com/ThreeDotsLabs/watermill/message" ) // CommandBus transports commands to command handlers. type CommandBus struct { - publisher message.Publisher - topic string - marshaler CommandEventMarshaler + publisher message.Publisher + generateTopic func(commandName string) string + marshaler CommandEventMarshaler } func NewCommandBus( publisher message.Publisher, - topic string, + generateTopic func(commandName string) string, marshaler CommandEventMarshaler, -) *CommandBus { +) (*CommandBus, error) { if publisher == nil { - panic("missing publisher") + return nil, errors.New("missing publisher") } - if topic == "" { - panic("missing topic") + if generateTopic == nil { + return nil, errors.New("missing generateTopic") } if marshaler == nil { - panic("missing marshaler") + return nil, errors.New("missing marshaler") } - return &CommandBus{publisher, topic, marshaler} + return &CommandBus{publisher, generateTopic, marshaler}, nil } // Send sends command to the command bus. @@ -38,7 +40,10 @@ func (c CommandBus) Send(ctx context.Context, cmd interface{}) error { return err } + commandName := c.marshaler.Name(cmd) + topicName := c.generateTopic(commandName) + msg.SetContext(ctx) - return c.publisher.Publish(c.topic, msg) + return c.publisher.Publish(topicName, msg) } diff --git a/components/cqrs/command_bus_test.go b/components/cqrs/command_bus_test.go index ee651eed9..9fdafed23 100644 --- a/components/cqrs/command_bus_test.go +++ b/components/cqrs/command_bus_test.go @@ -1,50 +1,45 @@ -package cqrs +package cqrs_test import ( "context" - "sync" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/components/cqrs" ) -type publisherStub struct { - messages map[string]message.Messages - - mu sync.Mutex -} - -func newPublisherStub() *publisherStub { - return &publisherStub{ - messages: make(map[string]message.Messages), - } -} - -func (*publisherStub) Close() error { - return nil -} - -func (p *publisherStub) Publish(topic string, messages ...*message.Message) error { - p.mu.Lock() - defer p.mu.Unlock() - - p.messages[topic] = append(p.messages[topic], messages...) - - return nil -} - func TestCommandBus_Send_ContextPropagation(t *testing.T) { publisher := newPublisherStub() - commandBus := NewCommandBus(publisher, "whatever", JSONMarshaler{}) + commandBus, err := cqrs.NewCommandBus( + publisher, + func(commandName string) string { + return "whatever" + }, + cqrs.JSONMarshaler{}, + ) + require.NoError(t, err) ctx := context.WithValue(context.Background(), "key", "value") - err := commandBus.Send(ctx, "message") + err = commandBus.Send(ctx, "message") require.NoError(t, err) assert.Equal(t, ctx, publisher.messages["whatever"][0].Context()) } + +func TestCommandBus_Send_topic_name(t *testing.T) { + cb, err := cqrs.NewCommandBus( + assertPublishTopicPublisher{ExpectedTopic: "cqrs_test.TestCommand", T: t}, + func(commandName string) string { + return commandName + }, + cqrs.JSONMarshaler{}, + ) + require.NoError(t, err) + + err = cb.Send(context.Background(), TestCommand{}) + require.NoError(t, err) +} diff --git a/components/cqrs/command_processor.go b/components/cqrs/command_processor.go index 93e4fd1a7..67c4a6865 100644 --- a/components/cqrs/command_processor.go +++ b/components/cqrs/command_processor.go @@ -15,38 +15,53 @@ import ( // // In contrast to EvenHandler, every Command must have only one CommandHandler. type CommandHandler interface { + // HandlerName is the name used in message.Router while creating handler. + // + // It will be also passed to CommandsSubscriberConstructor. + // May be useful, for example, to create a consumer group per each handler. + // + // WARNING: If HandlerName was changed and is used for generating consumer groups, + // it may result with **reconsuming all messages**! + HandlerName() string + NewCommand() interface{} + Handle(ctx context.Context, cmd interface{}) error } +// CommandsSubscriberConstructor creates subscriber for CommandHandler. +// It allows you to create a separate customized Subscriber for every command handler. +type CommandsSubscriberConstructor func(handlerName string) (message.Subscriber, error) + // CommandProcessor determines which CommandHandler should handle the command received from the command bus. type CommandProcessor struct { handlers []CommandHandler - commandsTopic string + generateTopic func(commandName string) string - subscriber message.Subscriber - marshaler CommandEventMarshaler - logger watermill.LoggerAdapter + subscriberConstructor CommandsSubscriberConstructor + + marshaler CommandEventMarshaler + logger watermill.LoggerAdapter } func NewCommandProcessor( handlers []CommandHandler, - commandsTopic string, - subscriber message.Subscriber, + generateTopic func(commandName string) string, + subscriberConstructor CommandsSubscriberConstructor, marshaler CommandEventMarshaler, logger watermill.LoggerAdapter, -) *CommandProcessor { +) (*CommandProcessor, error) { if len(handlers) == 0 { - panic("missing handlers") + return nil, errors.New("missing handlers") } - if commandsTopic == "" { - panic("empty commandsTopic name") + if generateTopic == nil { + return nil, errors.New("missing generateTopic") } - if subscriber == nil { - panic("missing subscriber") + if subscriberConstructor == nil { + return nil, errors.New("missing subscriberConstructor") } if marshaler == nil { - panic("missing marshaler") + return nil, errors.New("missing marshaler") } if logger == nil { logger = watermill.NopLogger{} @@ -54,32 +69,56 @@ func NewCommandProcessor( return &CommandProcessor{ handlers, - commandsTopic, - subscriber, + generateTopic, + subscriberConstructor, marshaler, logger, - } + }, nil +} + +type DuplicateCommandHandlerError struct { + CommandName string +} + +func (d DuplicateCommandHandlerError) Error() string { + return fmt.Sprintf("command handler for command %s already exists", d.CommandName) } func (p CommandProcessor) AddHandlersToRouter(r *message.Router) error { + handledCommands := map[string]struct{}{} + for i := range p.Handlers() { handler := p.handlers[i] + handlerName := handler.HandlerName() commandName := p.marshaler.Name(handler.NewCommand()) + topicName := p.generateTopic(commandName) + + if _, ok := handledCommands[commandName]; ok { + return DuplicateCommandHandlerError{commandName} + } + handledCommands[commandName] = struct{}{} + + logger := p.logger.With(watermill.LogFields{ + "command_handler_name": handlerName, + "topic": topicName, + }) - handlerFunc, err := p.RouterHandlerFunc(handler) + handlerFunc, err := p.routerHandlerFunc(handler, logger) if err != nil { return err } - handlerName := fmt.Sprintf("command_processor-%s", commandName) - p.logger.Debug("Adding CQRS handler to router", watermill.LogFields{ - "handler_name": handlerName, - }) + logger.Debug("Adding CQRS command handler to router", nil) + + subscriber, err := p.subscriberConstructor(handlerName) + if err != nil { + return errors.Wrap(err, "cannot create subscriber for command processor") + } r.AddNoPublisherHandler( handlerName, - p.commandsTopic, - p.subscriber, + topicName, + subscriber, handlerFunc, ) } @@ -91,7 +130,7 @@ func (p CommandProcessor) Handlers() []CommandHandler { return p.handlers } -func (p CommandProcessor) RouterHandlerFunc(handler CommandHandler) (message.HandlerFunc, error) { +func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) { cmd := handler.NewCommand() cmdName := p.marshaler.Name(cmd) @@ -104,7 +143,7 @@ func (p CommandProcessor) RouterHandlerFunc(handler CommandHandler) (message.Han messageCmdName := p.marshaler.NameFromMessage(msg) if messageCmdName != cmdName { - p.logger.Trace("Received different command type than expected, ignoring", watermill.LogFields{ + logger.Trace("Received different command type than expected, ignoring", watermill.LogFields{ "message_uuid": msg.UUID, "expected_command_type": cmdName, "received_command_type": messageCmdName, @@ -112,7 +151,7 @@ func (p CommandProcessor) RouterHandlerFunc(handler CommandHandler) (message.Han return nil, nil } - p.logger.Debug("Handling command", watermill.LogFields{ + logger.Debug("Handling command", watermill.LogFields{ "message_uuid": msg.UUID, "received_command_type": messageCmdName, }) @@ -122,6 +161,7 @@ func (p CommandProcessor) RouterHandlerFunc(handler CommandHandler) (message.Han } if err := handler.Handle(msg.Context(), cmd); err != nil { + logger.Debug("Error when handling command", watermill.LogFields{"err": err}) return nil, err } diff --git a/components/cqrs/command_processor_test.go b/components/cqrs/command_processor_test.go index 310b0fbce..14d3e709c 100644 --- a/components/cqrs/command_processor_test.go +++ b/components/cqrs/command_processor_test.go @@ -16,6 +16,10 @@ import ( type nonPointerCommandHandler struct { } +func (nonPointerCommandHandler) HandlerName() string { + return "nonPointerCommandHandler" +} + func (nonPointerCommandHandler) NewCommand() interface{} { return TestCommand{} } @@ -27,13 +31,18 @@ func (nonPointerCommandHandler) Handle(ctx context.Context, cmd interface{}) err func TestCommandProcessor_non_pointer_command(t *testing.T) { ts := NewTestServices() - commandProcessor := cqrs.NewCommandProcessor( + commandProcessor, err := cqrs.NewCommandProcessor( []cqrs.CommandHandler{nonPointerCommandHandler{}}, - "commands", - ts.CommandsPubSub, + func(commandName string) string { + return "commands" + }, + func(handlerName string) (message.Subscriber, error) { + return ts.CommandsPubSub, nil + }, ts.Marshaler, ts.Logger, ) + require.NoError(t, err) router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) require.NoError(t, err) @@ -46,25 +55,25 @@ func TestCommandProcessor_non_pointer_command(t *testing.T) { func TestCommandProcessor_multiple_same_command_handlers(t *testing.T) { ts := NewTestServices() - commandProcessor := cqrs.NewCommandProcessor( + commandProcessor, err := cqrs.NewCommandProcessor( []cqrs.CommandHandler{ &CaptureCommandHandler{}, &CaptureCommandHandler{}, }, - "commands", - ts.CommandsPubSub, + func(commandName string) string { + return "commands" + }, + func(handlerName string) (message.Subscriber, error) { + return ts.CommandsPubSub, nil + }, ts.Marshaler, ts.Logger, ) + require.NoError(t, err) router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) require.NoError(t, err) - assert.PanicsWithValue(t, - message.DuplicateHandlerNameError{HandlerName: "command_processor-cqrs_test.TestCommand"}, - func() { - err := commandProcessor.AddHandlersToRouter(router) - require.NoError(t, err) - }, - ) + err = commandProcessor.AddHandlersToRouter(router) + assert.EqualValues(t, cqrs.DuplicateCommandHandlerError{CommandName: "cqrs_test.TestCommand"}, err) } diff --git a/components/cqrs/cqrs.go b/components/cqrs/cqrs.go index 4b2858e8e..2c4bd2a3b 100644 --- a/components/cqrs/cqrs.go +++ b/components/cqrs/cqrs.go @@ -3,41 +3,77 @@ package cqrs import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-multierror" "github.com/pkg/errors" ) type FacadeConfig struct { - CommandsTopic string + // GenerateCommandsTopic generates topic name based on the command name. + // Command name is generated by CommandEventMarshaler's Name method. + // + // It allows you to use topic per command or one topic for every command. [todo - add to doc] + GenerateCommandsTopic func(commandName string) string + + // CommandHandlers return command handlers which should be executed. CommandHandlers func(commandBus *CommandBus, eventBus *EventBus) []CommandHandler - CommandsPubSub message.PubSub - EventsTopic string + // CommandsPublisher is Publisher used to publish commands. + CommandsPublisher message.Publisher + + // CommandsSubscriberConstructor is constructor for subscribers which will subscribe for messages. + // It will be called for every command handler. + // It allows you to create separated customized Subscriber for every command handler. + CommandsSubscriberConstructor CommandsSubscriberConstructor + + // GenerateEventsTopic generates topic name based on the event name. + // Event name is generated by CommandEventMarshaler's Name method. + // + // It allows you to use topic per command or one topic for every command. [todo - add to doc] + GenerateEventsTopic func(eventName string) string + + // EventHandlers return event handlers which should be executed. EventHandlers func(commandBus *CommandBus, eventBus *EventBus) []EventHandler - EventsPubSub message.PubSub - Router *message.Router - Logger watermill.LoggerAdapter + // EventsPublisher is Publisher used to publish commands. + EventsPublisher message.Publisher + + // EventsSubscriberConstructor is constructor for subscribers which will subscribe for messages. + // It will be called for every event handler. + // It allows you to create separated customized Subscriber for every event handler. + EventsSubscriberConstructor EventsSubscriberConstructor + + // Router is a Watermill router, which will be used to handle events and commands. + // Router handlers will be automatically generated by AddHandlersToRouter of Command and Event handlers. + Router *message.Router + CommandEventMarshaler CommandEventMarshaler + + Logger watermill.LoggerAdapter } func (c FacadeConfig) Validate() error { var err error if c.CommandsEnabled() { - if c.CommandsTopic == "" { - err = multierror.Append(err, errors.New("CommandsTopic is empty")) + if c.GenerateCommandsTopic == nil { + err = multierror.Append(err, errors.New("GenerateCommandsTopic is nil")) + } + if c.CommandsSubscriberConstructor == nil { + err = multierror.Append(err, errors.New("CommandsSubscriberConstructor is nil")) } - if c.CommandsPubSub == nil { - err = multierror.Append(err, errors.New("CommandsPubSub is nil")) + if c.CommandsPublisher == nil { + err = multierror.Append(err, errors.New("CommandsPublisher is nil")) } } if c.EventsEnabled() { - if c.EventsTopic == "" { - err = multierror.Append(err, errors.New("EventsTopic is empty")) + if c.GenerateEventsTopic == nil { + err = multierror.Append(err, errors.New("GenerateEventsTopic is nil")) + } + if c.EventsSubscriberConstructor == nil { + err = multierror.Append(err, errors.New("EventsSubscriberConstructor is nil")) } - if c.EventsPubSub == nil { - err = multierror.Append(err, errors.New("EventsPubSub is nil")) + if c.EventsPublisher == nil { + err = multierror.Append(err, errors.New("EventsPublisher is nil")) } } @@ -55,38 +91,30 @@ func (c FacadeConfig) Validate() error { } func (c FacadeConfig) EventsEnabled() bool { - return c.EventsTopic != "" || c.EventsPubSub != nil + return c.GenerateEventsTopic != nil || c.EventsPublisher != nil || c.EventsSubscriberConstructor != nil } func (c FacadeConfig) CommandsEnabled() bool { - return c.CommandsTopic != "" || c.CommandsPubSub != nil + return c.GenerateCommandsTopic != nil || c.CommandsPublisher != nil || c.CommandsSubscriberConstructor != nil } // Facade is a facade for creating the Command and Event buses and processors. // It was created to avoid boilerplate, when using CQRS in the standard way. // You can also create buses and processors manually, drawing inspiration from how it's done in NewFacade. type Facade struct { - commandsTopic string + commandsTopic func(commandName string) string commandBus *CommandBus - eventsTopic string + eventsTopic func(eventName string) string eventBus *EventBus commandEventMarshaler CommandEventMarshaler } -func (f Facade) CommandsTopic() string { - return f.commandsTopic -} - func (f Facade) CommandBus() *CommandBus { return f.commandBus } -func (f Facade) EventsTopic() string { - return f.eventsTopic -} - func (f Facade) EventBus() *EventBus { return f.eventBus } @@ -101,47 +129,63 @@ func NewFacade(config FacadeConfig) (*Facade, error) { } c := &Facade{ - commandsTopic: config.CommandsTopic, - eventsTopic: config.EventsTopic, + commandsTopic: config.GenerateCommandsTopic, + eventsTopic: config.GenerateEventsTopic, commandEventMarshaler: config.CommandEventMarshaler, } if config.CommandsEnabled() { - c.commandBus = NewCommandBus(config.CommandsPubSub, config.CommandsTopic, config.CommandEventMarshaler) + var err error + c.commandBus, err = NewCommandBus( + config.CommandsPublisher, + config.GenerateCommandsTopic, + config.CommandEventMarshaler, + ) + if err != nil { + return nil, errors.Wrap(err, "cannot create command bus") + } } else { - config.Logger.Info("Empty CommandsTopic, command bus will be not created", nil) + config.Logger.Info("Empty GenerateCommandsTopic, command bus will be not created", nil) } if config.EventsEnabled() { - c.eventBus = NewEventBus(config.EventsPubSub, config.EventsTopic, config.CommandEventMarshaler) + var err error + c.eventBus, err = NewEventBus(config.EventsPublisher, config.GenerateEventsTopic, config.CommandEventMarshaler) + if err != nil { + return nil, errors.Wrap(err, "cannot create event bus") + } } else { - config.Logger.Info("Empty EventsTopic, event bus will be not created", nil) + config.Logger.Info("Empty GenerateEventsTopic, event bus will be not created", nil) } if config.CommandHandlers != nil { - commandProcessor := NewCommandProcessor( + commandProcessor, err := NewCommandProcessor( config.CommandHandlers(c.commandBus, c.eventBus), - config.CommandsTopic, - config.CommandsPubSub, + config.GenerateCommandsTopic, + config.CommandsSubscriberConstructor, config.CommandEventMarshaler, config.Logger, ) - - err := commandProcessor.AddHandlersToRouter(config.Router) if err != nil { + return nil, errors.Wrap(err, "cannot create command processor") + } + + if err := commandProcessor.AddHandlersToRouter(config.Router); err != nil { return nil, err } } if config.EventHandlers != nil { - eventProcessor := NewEventProcessor( + eventProcessor, err := NewEventProcessor( config.EventHandlers(c.commandBus, c.eventBus), - config.EventsTopic, - config.EventsPubSub, + config.GenerateEventsTopic, + config.EventsSubscriberConstructor, config.CommandEventMarshaler, config.Logger, ) - - err := eventProcessor.AddHandlersToRouter(config.Router) if err != nil { + return nil, errors.Wrap(err, "cannot create event processor") + } + + if err := eventProcessor.AddHandlersToRouter(config.Router); err != nil { return nil, err } } diff --git a/components/cqrs/cqrs_test.go b/components/cqrs/cqrs_test.go index b84b22ce1..09340c1cb 100644 --- a/components/cqrs/cqrs_test.go +++ b/components/cqrs/cqrs_test.go @@ -2,6 +2,7 @@ package cqrs_test import ( "context" + "sync" "testing" "time" @@ -53,8 +54,16 @@ func createRouterAndFacade(ts TestServices, t *testing.T, commandHandler *Captur require.NoError(t, err) c, err := cqrs.NewFacade(cqrs.FacadeConfig{ - CommandsTopic: "commands", - EventsTopic: "events", + GenerateCommandsTopic: func(commandName string) string { + assert.Equal(t, "cqrs_test.TestCommand", commandName) + + return commandName + }, + GenerateEventsTopic: func(eventName string) string { + assert.Equal(t, "cqrs_test.TestEvent", eventName) + + return eventName + }, CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler { require.NotNil(t, cb) require.NotNil(t, eb) @@ -67,9 +76,19 @@ func createRouterAndFacade(ts TestServices, t *testing.T, commandHandler *Captur return []cqrs.EventHandler{eventHandler} }, - Router: router, - CommandsPubSub: ts.CommandsPubSub, - EventsPubSub: ts.EventsPubSub, + Router: router, + CommandsPublisher: ts.CommandsPubSub, + CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) { + assert.Equal(t, "CaptureCommandHandler", handlerName) + + return ts.CommandsPubSub, nil + }, + EventsPublisher: ts.EventsPubSub, + EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) { + assert.Equal(t, "CaptureEventHandler", handlerName) + + return ts.EventsPubSub, nil + }, Logger: ts.Logger, CommandEventMarshaler: ts.Marshaler, }) @@ -116,6 +135,10 @@ type CaptureCommandHandler struct { handledCommands []interface{} } +func (h CaptureCommandHandler) HandlerName() string { + return "CaptureCommandHandler" +} + func (h CaptureCommandHandler) HandledCommands() []interface{} { return h.handledCommands } @@ -142,6 +165,10 @@ type CaptureEventHandler struct { handledEvents []interface{} } +func (h CaptureEventHandler) HandlerName() string { + return "CaptureEventHandler" +} + func (h CaptureEventHandler) HandledEvents() []interface{} { return h.handledEvents } @@ -158,3 +185,42 @@ func (h *CaptureEventHandler) Handle(ctx context.Context, event interface{}) err h.handledEvents = append(h.handledEvents, event.(*TestEvent)) return nil } + +type assertPublishTopicPublisher struct { + ExpectedTopic string + T *testing.T +} + +func (a assertPublishTopicPublisher) Publish(topic string, messages ...*message.Message) error { + assert.Equal(a.T, a.ExpectedTopic, topic) + return nil +} + +func (assertPublishTopicPublisher) Close() error { + return nil +} + +type publisherStub struct { + messages map[string]message.Messages + + mu sync.Mutex +} + +func newPublisherStub() *publisherStub { + return &publisherStub{ + messages: make(map[string]message.Messages), + } +} + +func (*publisherStub) Close() error { + return nil +} + +func (p *publisherStub) Publish(topic string, messages ...*message.Message) error { + p.mu.Lock() + defer p.mu.Unlock() + + p.messages[topic] = append(p.messages[topic], messages...) + + return nil +} diff --git a/components/cqrs/event_bus.go b/components/cqrs/event_bus.go index ae766d1bb..5a720ebaa 100644 --- a/components/cqrs/event_bus.go +++ b/components/cqrs/event_bus.go @@ -4,31 +4,32 @@ import ( "context" "github.com/ThreeDotsLabs/watermill/message" + "github.com/pkg/errors" ) // EventBus transports events to event handlers. type EventBus struct { - publisher message.Publisher - topic string - marshaler CommandEventMarshaler + publisher message.Publisher + generateTopic func(eventName string) string + marshaler CommandEventMarshaler } func NewEventBus( publisher message.Publisher, - topic string, + generateTopic func(eventName string) string, marshaler CommandEventMarshaler, -) *EventBus { +) (*EventBus, error) { if publisher == nil { - panic("missing publisher") + return nil, errors.New("missing publisher") } - if topic == "" { - panic("missing topic") + if generateTopic == nil { + return nil, errors.New("missing generateTopic") } if marshaler == nil { - panic("missing marshaler") + return nil, errors.New("missing marshaler") } - return &EventBus{publisher, topic, marshaler} + return &EventBus{publisher, generateTopic, marshaler}, nil } // Publish sends event to the event bus. @@ -38,7 +39,10 @@ func (c EventBus) Publish(ctx context.Context, event interface{}) error { return err } + eventName := c.marshaler.Name(event) + topicName := c.generateTopic(eventName) + msg.SetContext(ctx) - return c.publisher.Publish(c.topic, msg) + return c.publisher.Publish(topicName, msg) } diff --git a/components/cqrs/event_bus_test.go b/components/cqrs/event_bus_test.go index 405b93f04..dfdbe1b0f 100644 --- a/components/cqrs/event_bus_test.go +++ b/components/cqrs/event_bus_test.go @@ -1,9 +1,10 @@ -package cqrs +package cqrs_test import ( "context" "testing" + "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -11,12 +12,33 @@ import ( func TestEventBus_Send_ContextPropagation(t *testing.T) { publisher := newPublisherStub() - eventBus := NewEventBus(publisher, "whatever", JSONMarshaler{}) + eventBus, err := cqrs.NewEventBus( + publisher, + func(eventName string) string { + return "whatever" + }, + cqrs.JSONMarshaler{}, + ) + require.NoError(t, err) ctx := context.WithValue(context.Background(), "key", "value") - err := eventBus.Publish(ctx, "message") + err = eventBus.Publish(ctx, "message") require.NoError(t, err) assert.Equal(t, ctx, publisher.messages["whatever"][0].Context()) } + +func TestEventBus_Send_topic_name(t *testing.T) { + cb, err := cqrs.NewEventBus( + assertPublishTopicPublisher{ExpectedTopic: "cqrs_test.TestEvent", T: t}, + func(commandName string) string { + return commandName + }, + cqrs.JSONMarshaler{}, + ) + require.NoError(t, err) + + err = cb.Publish(context.Background(), TestEvent{}) + require.NoError(t, err) +} diff --git a/components/cqrs/event_processor.go b/components/cqrs/event_processor.go index 02a30c678..d63d23b18 100644 --- a/components/cqrs/event_processor.go +++ b/components/cqrs/event_processor.go @@ -2,7 +2,6 @@ package cqrs import ( "context" - "fmt" "github.com/pkg/errors" @@ -10,44 +9,59 @@ import ( "github.com/ThreeDotsLabs/watermill/message" ) -// EventHandler receive event defined by NewEvent and handle it with Handle method. +// EventHandler receives events defined by NewEvent and handles them with its Handle method. // If using DDD, CommandHandler may modify and persist the aggregate. -// It can also invoke process manager, saga or just build a read model. +// It can also invoke a process manager, a saga or just build a read model. // // In contrast to CommandHandler, every Event can have multiple EventHandlers. type EventHandler interface { + // HandlerName is the name used in message.Router while creating handler. + // + // It will be also passed to EventsSubscriberConstructor. + // May be useful, for example, to create a consumer group per each handler. + // + // WARNING: If HandlerName was changed and is used for generating consumer groups, + // it may result with **reconsuming all messages** !!! + HandlerName() string + NewEvent() interface{} + Handle(ctx context.Context, event interface{}) error } +// EventsSubscriberConstructor creates a subscriber for EventHandler. +// It allows you to create separated customized Subscriber for every command handler. +type EventsSubscriberConstructor func(handlerName string) (message.Subscriber, error) + // EventProcessor determines which EventHandler should handle event received from event bus. type EventProcessor struct { - handlers []EventHandler - eventsTopic string + handlers []EventHandler + generateTopic func(eventName string) string - subscriber message.Subscriber - marshaler CommandEventMarshaler - logger watermill.LoggerAdapter + subscriberConstructor EventsSubscriberConstructor + + marshaler CommandEventMarshaler + logger watermill.LoggerAdapter } func NewEventProcessor( handlers []EventHandler, - eventsTopic string, - subscriber message.Subscriber, + generateTopic func(eventName string) string, + subscriberConstructor EventsSubscriberConstructor, marshaler CommandEventMarshaler, logger watermill.LoggerAdapter, -) *EventProcessor { +) (*EventProcessor, error) { if len(handlers) == 0 { - panic("missing handlers") + return nil, errors.New("missing handlers") } - if eventsTopic == "" { - panic("empty eventsTopic") + if generateTopic == nil { + return nil, errors.New("nil generateTopic") } - if subscriber == nil { - panic("missing subscriber") + if subscriberConstructor == nil { + return nil, errors.New("missing subscriberConstructor") } if marshaler == nil { - panic("missing marshaler") + return nil, errors.New("missing marshaler") } if logger == nil { logger = watermill.NopLogger{} @@ -55,26 +69,41 @@ func NewEventProcessor( return &EventProcessor{ handlers, - eventsTopic, - subscriber, + generateTopic, + subscriberConstructor, marshaler, logger, - } + }, nil } func (p EventProcessor) AddHandlersToRouter(r *message.Router) error { for i := range p.Handlers() { handler := p.handlers[i] + handlerName := handler.HandlerName() + eventName := p.marshaler.Name(handler.NewEvent()) + topicName := p.generateTopic(eventName) - handlerFunc, err := p.RouterHandlerFunc(handler) + logger := p.logger.With(watermill.LogFields{ + "event_handler_name": handlerName, + "topic": topicName, + }) + + handlerFunc, err := p.routerHandlerFunc(handler, logger) if err != nil { return err } + logger.Debug("Adding CQRS event handler to router", nil) + + subscriber, err := p.subscriberConstructor(handlerName) + if err != nil { + return errors.Wrap(err, "cannot create subscriber for event processor") + } + r.AddNoPublisherHandler( - fmt.Sprintf("event_processor-%s", ObjectName(handler)), - p.eventsTopic, - p.subscriber, + handlerName, + topicName, + subscriber, handlerFunc, ) } @@ -86,7 +115,7 @@ func (p EventProcessor) Handlers() []EventHandler { return p.handlers } -func (p EventProcessor) RouterHandlerFunc(handler EventHandler) (message.HandlerFunc, error) { +func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) { initEvent := handler.NewEvent() expectedEventName := p.marshaler.Name(initEvent) @@ -99,7 +128,7 @@ func (p EventProcessor) RouterHandlerFunc(handler EventHandler) (message.Handler messageEventName := p.marshaler.NameFromMessage(msg) if messageEventName != expectedEventName { - p.logger.Trace("Received different event type than expected, ignoring", watermill.LogFields{ + logger.Trace("Received different event type than expected, ignoring", watermill.LogFields{ "message_uuid": msg.UUID, "expected_event_type": expectedEventName, "received_event_type": messageEventName, @@ -107,7 +136,7 @@ func (p EventProcessor) RouterHandlerFunc(handler EventHandler) (message.Handler return nil, nil } - p.logger.Debug("Handling event", watermill.LogFields{ + logger.Debug("Handling event", watermill.LogFields{ "message_uuid": msg.UUID, "received_event_type": messageEventName, }) @@ -117,6 +146,7 @@ func (p EventProcessor) RouterHandlerFunc(handler EventHandler) (message.Handler } if err := handler.Handle(msg.Context(), event); err != nil { + logger.Debug("Error when handling event", watermill.LogFields{"err": err}) return nil, err } diff --git a/components/cqrs/event_processor_test.go b/components/cqrs/event_processor_test.go index 4e4321f47..b857520a2 100644 --- a/components/cqrs/event_processor_test.go +++ b/components/cqrs/event_processor_test.go @@ -16,6 +16,10 @@ import ( type nonPointerEventProcessor struct { } +func (nonPointerEventProcessor) HandlerName() string { + return "nonPointerEventProcessor" +} + func (nonPointerEventProcessor) NewEvent() interface{} { return TestEvent{} } @@ -27,13 +31,18 @@ func (nonPointerEventProcessor) Handle(ctx context.Context, cmd interface{}) err func TestEventProcessor_non_pointer_event(t *testing.T) { ts := NewTestServices() - eventProcessor := cqrs.NewEventProcessor( + eventProcessor, err := cqrs.NewEventProcessor( []cqrs.EventHandler{nonPointerEventProcessor{}}, - "events", - ts.EventsPubSub, + func(eventName string) string { + return "events" + }, + func(handlerName string) (message.Subscriber, error) { + return ts.EventsPubSub, nil + }, ts.Marshaler, ts.Logger, ) + require.NoError(t, err) router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) require.NoError(t, err) @@ -44,6 +53,10 @@ func TestEventProcessor_non_pointer_event(t *testing.T) { type duplicateTestEventHandler1 struct{} +func (h duplicateTestEventHandler1) HandlerName() string { + return "duplicateTestEventHandler1" +} + func (duplicateTestEventHandler1) NewEvent() interface{} { return &TestEvent{} } @@ -52,6 +65,10 @@ func (h *duplicateTestEventHandler1) Handle(ctx context.Context, event interface type duplicateTestEventHandler2 struct{} +func (h duplicateTestEventHandler2) HandlerName() string { + return "duplicateTestEventHandler2" +} + func (duplicateTestEventHandler2) NewEvent() interface{} { return &TestEvent{} } @@ -61,16 +78,21 @@ func (h *duplicateTestEventHandler2) Handle(ctx context.Context, event interface func TestEventProcessor_multiple_same_event_handlers(t *testing.T) { ts := NewTestServices() - eventProcessor := cqrs.NewEventProcessor( + eventProcessor, err := cqrs.NewEventProcessor( []cqrs.EventHandler{ &duplicateTestEventHandler1{}, &duplicateTestEventHandler2{}, }, - "events", - ts.EventsPubSub, + func(eventName string) string { + return "events" + }, + func(handlerName string) (message.Subscriber, error) { + return ts.EventsPubSub, nil + }, ts.Marshaler, ts.Logger, ) + require.NoError(t, err) router, err := message.NewRouter(message.RouterConfig{}, ts.Logger) require.NoError(t, err) diff --git a/components/cqrs/object.go b/components/cqrs/object.go index 8d9531889..538f870f1 100644 --- a/components/cqrs/object.go +++ b/components/cqrs/object.go @@ -24,6 +24,8 @@ func (e NonPointerError) Error() string { return "non-pointer command: " + e.Type.String() + ", handler.NewCommand() should return pointer to the command" } +// ObjectName name returns object name in format [package].[type name]. +// It ignores if the value is a pointer or not. func ObjectName(v interface{}) string { s := fmt.Sprintf("%T", v) s = strings.TrimLeft(s, "*") diff --git a/message/infrastructure/gochannel/pubsub.go b/message/infrastructure/gochannel/pubsub.go index 2a7a051c3..b3912a243 100644 --- a/message/infrastructure/gochannel/pubsub.go +++ b/message/infrastructure/gochannel/pubsub.go @@ -141,15 +141,18 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan subscribers := g.topicSubscribers(topic) ackedBySubscribers := make(chan struct{}) + logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic} + if len(subscribers) == 0 { close(ackedBySubscribers) + g.logger.Info("No subscribers to send message", logFields) return ackedBySubscribers, nil } go func(subscribers []*subscriber) { for i := range subscribers { subscriber := subscribers[i] - subscriber.sendMessageToSubscriber(message) + subscriber.sendMessageToSubscriber(message, logFields) } close(ackedBySubscribers) }(subscribers) @@ -221,8 +224,9 @@ func (g *GoChannel) Subscribe(ctx context.Context, topic string) (<-chan *messag if ok { for i := range messages { msg := g.persistedMessages[topic][i] + logFields := watermill.LogFields{"message_uuid": msg.UUID, "topic": topic} - go s.sendMessageToSubscriber(msg) + go s.sendMessageToSubscriber(msg, logFields) } } @@ -313,15 +317,10 @@ func (s *subscriber) Close() { close(s.outputChannel) } -func (s *subscriber) sendMessageToSubscriber(msg *message.Message) { +func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields watermill.LogFields) { s.sending.Lock() defer s.sending.Unlock() - subscriberLogFields := watermill.LogFields{ - "message_uuid": msg.UUID, - "pubsub_uuid": s.uuid, - } - ctx, cancelCtx := context.WithCancel(s.ctx) defer cancelCtx() @@ -332,30 +331,30 @@ SendToSubscriber: msgToSend := msg.Copy() msgToSend.SetContext(ctx) - s.logger.Trace("Sending msg to subscriber", subscriberLogFields) + s.logger.Trace("Sending msg to subscriber", logFields) if s.closed { - s.logger.Info("Pub/Sub closed, discarding msg", subscriberLogFields) + s.logger.Info("Pub/Sub closed, discarding msg", logFields) return } select { case s.outputChannel <- msgToSend: - s.logger.Trace("Sent message to subscriber", subscriberLogFields) + s.logger.Trace("Sent message to subscriber", logFields) case <-s.closing: - s.logger.Trace("Closing, message discarded", subscriberLogFields) + s.logger.Trace("Closing, message discarded", logFields) return } select { case <-msgToSend.Acked(): - s.logger.Trace("Message acked", subscriberLogFields) + s.logger.Trace("Message acked", logFields) return case <-msgToSend.Nacked(): - s.logger.Trace("Nack received, resending message", subscriberLogFields) + s.logger.Trace("Nack received, resending message", logFields) continue SendToSubscriber case <-s.closing: - s.logger.Trace("Closing, message discarded", subscriberLogFields) + s.logger.Trace("Closing, message discarded", logFields) return } } diff --git a/message/message.go b/message/message.go index 6382bcf99..d15fdaa16 100644 --- a/message/message.go +++ b/message/message.go @@ -25,6 +25,8 @@ type Message struct { // // Can be used to store data which doesn't require unmarshaling entire payload. // It is something similar to HTTP request's headers. + // + // Metadata is marshaled and will be saved to PubSub. Metadata Metadata // Payload is message's payload. diff --git a/message/router/middleware/correlation.go b/message/router/middleware/correlation.go index 40344dafb..bac756588 100644 --- a/message/router/middleware/correlation.go +++ b/message/router/middleware/correlation.go @@ -6,6 +6,28 @@ import ( const CorrelationIDMetadataKey = "correlation_id" +// SetCorrelationID sets a correlation ID for the message. +// +// SetCorrelationID should be called when the message enters the system. +// When message is produced in a request (for example HTTP), +// message correlation ID should be the same as the request's correlation ID. +func SetCorrelationID(id string, msg *message.Message) { + if MessageCorrelationID(msg) != "" { + return + } + + msg.Metadata.Set(CorrelationIDMetadataKey, id) +} + +// MessageCorrelationID returns correlation ID from the message. +func MessageCorrelationID(message *message.Message) string { + return message.Metadata.Get(CorrelationIDMetadataKey) +} + +// CorrelationID adds correlation ID to all messages produced by the handler. +// ID is based on ID from message received by handler. +// +// To make CorrelationID working correctly, SetCorrelationID must be called to first message entering the system. func CorrelationID(h message.HandlerFunc) message.HandlerFunc { return func(message *message.Message) ([]*message.Message, error) { producedMessages, err := h(message) @@ -18,15 +40,3 @@ func CorrelationID(h message.HandlerFunc) message.HandlerFunc { return producedMessages, err } } - -func MessageCorrelationID(message *message.Message) string { - return message.Metadata.Get(CorrelationIDMetadataKey) -} - -func SetCorrelationID(id string, msg *message.Message) { - if MessageCorrelationID(msg) != "" { - return - } - - msg.Metadata.Set(CorrelationIDMetadataKey, id) -} diff --git a/message/router/middleware/correlation_test.go b/message/router/middleware/correlation_test.go index 9ed9868b0..2fea078a0 100644 --- a/message/router/middleware/correlation_test.go +++ b/message/router/middleware/correlation_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/ThreeDotsLabs/watermill/message/router/middleware" @@ -23,6 +24,16 @@ func TestCorrelationID(t *testing.T) { producedMsgs, err := handler(msg) + assert.Equal(t, "2", producedMsgs[0].UUID) assert.Equal(t, middleware.MessageCorrelationID(producedMsgs[0]), "correlation_id") assert.Equal(t, handlerErr, err) } + +func TestSetCorrelationID_already_set(t *testing.T) { + msg := message.NewMessage("", nil) + + middleware.SetCorrelationID("foo", msg) + middleware.SetCorrelationID("bar", msg) + + assert.Equal(t, "foo", middleware.MessageCorrelationID(msg)) +} diff --git a/message/router/middleware/poison.go b/message/router/middleware/poison.go index cf605d436..41d0d10fc 100644 --- a/message/router/middleware/poison.go +++ b/message/router/middleware/poison.go @@ -2,24 +2,57 @@ package middleware import ( "github.com/ThreeDotsLabs/watermill/message" + "github.com/hashicorp/go-multierror" "github.com/pkg/errors" ) // ErrInvalidPoisonQueueTopic occurs when the topic supplied to the PoisonQueue constructor is invalid. var ErrInvalidPoisonQueueTopic = errors.New("invalid poison queue topic") +// ReasonForPoisonedKey is the metadata key which marks the reason (error) why the message was deemed poisoned. +var ReasonForPoisonedKey = "reason_poisoned" + +type poisonQueue struct { + topic string + pub message.Publisher + + shouldGoToPoisonQueue func(err error) bool +} + // PoisonQueue provides a middleware that salvages unprocessable messages and published them on a separate topic. // The main middleware chain then continues on, business as usual. -type PoisonQueue struct { - topic string - pub message.Publisher - Middleware message.HandlerMiddleware +func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) { + if topic == "" { + return nil, ErrInvalidPoisonQueueTopic + } + + pq := poisonQueue{ + topic: topic, + pub: pub, + shouldGoToPoisonQueue: func(err error) bool { + return true + }, + } + + return pq.Middleware, nil } -// ReasonForPoisonedKey is the metadata key which marks the reason (error) why the message was deemed poisoned. -var ReasonForPoisonedKey = "reason_poisoned" +func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) { + if topic == "" { + return nil, ErrInvalidPoisonQueueTopic + } + + pq := poisonQueue{ + topic: topic, + pub: pub, + + shouldGoToPoisonQueue: shouldGoToPoisonQueue, + } + + return pq.Middleware, nil +} -func (pq PoisonQueue) publishPoisonMessage(msg *message.Message, err error) error { +func (pq poisonQueue) publishPoisonMessage(msg *message.Message, err error) error { // no problems encountered, carry on if err == nil { return nil @@ -32,28 +65,28 @@ func (pq PoisonQueue) publishPoisonMessage(msg *message.Message, err error) erro return pq.pub.Publish(pq.topic, msg) } -func NewPoisonQueue(pub message.Publisher, topic string) (PoisonQueue, error) { - if topic == "" { - return PoisonQueue{}, ErrInvalidPoisonQueueTopic - } - - pq := PoisonQueue{ - topic: topic, - pub: pub, - } +func (pq poisonQueue) Middleware(h message.HandlerFunc) message.HandlerFunc { + return func(msg *message.Message) (events []*message.Message, err error) { + defer func() { + if err != nil { + if !pq.shouldGoToPoisonQueue(err) { + return + } - pq.Middleware = func(h message.HandlerFunc) message.HandlerFunc { - return func(msg *message.Message) (events []*message.Message, err error) { - defer func() { - if err != nil { - // handler didn't cope with the message; publish it on the poison topic and carry on as usual - err = pq.publishPoisonMessage(msg, err) + // handler didn't cope with the message; publish it on the poison topic and carry on as usual + publishErr := pq.publishPoisonMessage(msg, err) + if publishErr != nil { + publishErr = errors.Wrap(publishErr, "cannot publish message to poison queue") + err = multierror.Append(err, publishErr) + return } - }() - // if h fails, the deferred function will salvage all that it can - return h(msg) - } + err = nil + return + } + }() + + // if h fails, the deferred function will salvage all that it can + return h(msg) } - return pq, nil } diff --git a/message/router/middleware/poison_test.go b/message/router/middleware/poison_test.go index 45102b87b..d2cac562f 100644 --- a/message/router/middleware/poison_test.go +++ b/message/router/middleware/poison_test.go @@ -3,6 +3,8 @@ package middleware_test import ( "testing" + "github.com/hashicorp/go-multierror" + "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" @@ -17,78 +19,174 @@ const topic = "testing_poison_queue_topic" // We expect that all messages pass through the middleware unaffected and the poison queue catches no messages. func TestPoisonQueue_handler_ok(t *testing.T) { poisonPublisher := mockPublisher{behaviour: BehaviourAlwaysOK} - poisonQueue, err := middleware.NewPoisonQueue(&poisonPublisher, topic) + + poisonQueue, err := middleware.PoisonQueue(&poisonPublisher, topic) require.NoError(t, err) - produced, err := poisonQueue.Middleware(handlerFuncAlwaysOK)( - message.NewMessage("uuid", nil), - ) + poisonQueueWithFilter, err := middleware.PoisonQueueWithFilter(&poisonPublisher, topic, func(err error) bool { + return true + }) + require.NoError(t, err) - assert.NoError(t, err) - assert.Equal(t, handlerFuncAlwaysOKMessages, produced) - assert.Empty(t, poisonPublisher.PopMessages()) + testCases := []struct { + Name string + Middleware message.HandlerMiddleware + }{ + { + Name: "PoisonQueue", + Middleware: poisonQueue, + }, + { + Name: "PoisonQueueWithFilter", + Middleware: poisonQueueWithFilter, + }, + } + + for _, c := range testCases { + t.Run(c.Name, func(t *testing.T) { + produced, err := c.Middleware(handlerFuncAlwaysOK)( + message.NewMessage("uuid", nil), + ) + + assert.NoError(t, err) + assert.Equal(t, handlerFuncAlwaysOKMessages, produced) + assert.Empty(t, poisonPublisher.PopMessages()) + }) + } } func TestPoisonQueue_handler_failing(t *testing.T) { poisonPublisher := mockPublisher{behaviour: BehaviourAlwaysOK} - poisonQueue, err := middleware.NewPoisonQueue(&poisonPublisher, topic) - require.NoError(t, err) - - msg := message.NewMessage("uuid", []byte("payload")) - produced, err := poisonQueue.Middleware(handlerFuncAlwaysFailing)( - msg, - ) - // the middleware itself should not fail; the publisher is working OK, so no error is passed down the chain - assert.NoError(t, err) - - // but no messages should be passed - assert.Empty(t, produced) - - // the original message should end up in the poison queue - poisonMsgs := poisonPublisher.PopMessages() - require.Len(t, poisonMsgs, 1) + poisonQueue, err := middleware.PoisonQueue(&poisonPublisher, topic) + require.NoError(t, err) - assert.Equal(t, msg.Payload, poisonMsgs[0].Payload) + poisonQueueWithFilter, err := middleware.PoisonQueueWithFilter(&poisonPublisher, topic, func(err error) bool { + return true + }) + require.NoError(t, err) - // there should be additional metadata telling why the message was poisoned - // it should be the error that the handler failed with - assert.Equal(t, errFailed.Error(), poisonMsgs[0].Metadata.Get(middleware.ReasonForPoisonedKey)) + testCases := []struct { + Name string + Middleware message.HandlerMiddleware + }{ + { + Name: "PoisonQueue", + Middleware: poisonQueue, + }, + { + Name: "PoisonQueueWithFilter", + Middleware: poisonQueueWithFilter, + }, + } + + for _, c := range testCases { + t.Run(c.Name, func(t *testing.T) { + msg := message.NewMessage("uuid", []byte("payload")) + produced, err := c.Middleware(handlerFuncAlwaysFailing)( + msg, + ) + + // the middleware itself should not fail; the publisher is working OK, so no error is passed down the chain + assert.NoError(t, err) + + // but no messages should be passed + assert.Empty(t, produced) + + // the original message should end up in the poison queue + poisonMsgs := poisonPublisher.PopMessages() + require.Len(t, poisonMsgs, 1) + + assert.Equal(t, msg.Payload, poisonMsgs[0].Payload) + + // there should be additional metadata telling why the message was poisoned + // it should be the error that the handler failed with + assert.Equal(t, errFailed.Error(), poisonMsgs[0].Metadata.Get(middleware.ReasonForPoisonedKey)) + }) + } } func TestPoisonQueue_handler_failing_publisher_failing(t *testing.T) { poisonPublisher := mockPublisher{behaviour: BehaviourAlwaysFail} - poisonQueue, err := middleware.NewPoisonQueue(&poisonPublisher, topic) + poisonQueue, err := middleware.PoisonQueue(&poisonPublisher, topic) + require.NoError(t, err) + + poisonQueueWithFilter, err := middleware.PoisonQueueWithFilter(&poisonPublisher, topic, func(err error) bool { + return true + }) require.NoError(t, err) - msg := message.NewMessage("uuid", nil) - produced, err := poisonQueue.Middleware(handlerFuncAlwaysFailing)( - msg, - ) + testCases := []struct { + Name string + Middleware message.HandlerMiddleware + }{ + { + Name: "PoisonQueue", + Middleware: poisonQueue, + }, + { + Name: "PoisonQueueWithFilter", + Middleware: poisonQueueWithFilter, + }, + } + + for _, c := range testCases { + t.Run(c.Name, func(t *testing.T) { + msg := message.NewMessage("uuid", nil) + produced, err := poisonQueue(handlerFuncAlwaysFailing)( + msg, + ) + + require.IsType(t, &multierror.Error{}, err) + multierr := err.(*multierror.Error) + + // publisher failed, can't hide the error anymore + assert.Equal(t, errFailed, errors.Cause(multierr.WrappedErrors()[1])) + + // can't really expect any produced messages + assert.Empty(t, produced) + + // nor poison messages + assert.Empty(t, poisonPublisher.PopMessages()) + }) + } +} + +func TestPoisonQueueWithFilter_poison_queue(t *testing.T) { + poisonPublisher := mockPublisher{behaviour: BehaviourAlwaysOK} - // publisher failed, can't hide the error anymore - assert.Equal(t, errFailed, errors.Cause(err)) + poisonQueueErr := errors.New("poison queue err") + msg := message.NewMessage("uuid", []byte("payload")) - // can't really expect any produced messages - assert.Empty(t, produced) + poisonQueue, err := middleware.PoisonQueueWithFilter(&poisonPublisher, topic, func(err error) bool { + return err == poisonQueueErr + }) + require.NoError(t, err) - // nor poison messages - assert.Empty(t, poisonPublisher.PopMessages()) + _, err = poisonQueue(func(msg *message.Message) (messages []*message.Message, e error) { + return nil, poisonQueueErr + })(msg) + + assert.NoError(t, err) + require.Len(t, poisonPublisher.PopMessages(), 1) } -func TestPoisonQueue_handler_failing_publisher_panicking(t *testing.T) { - poisonPublisher := mockPublisher{behaviour: BehaviourAlwaysPanic} +func TestPoisonQueueWithFilter_non_poison_queue(t *testing.T) { + poisonPublisher := mockPublisher{behaviour: BehaviourAlwaysOK} - poisonQueue, err := middleware.NewPoisonQueue(&poisonPublisher, topic) + nonPoisonQueueErr := errors.New("non poison queue err") + msg := message.NewMessage("uuid", []byte("payload")) + + poisonQueue, err := middleware.PoisonQueueWithFilter(&poisonPublisher, topic, func(err error) bool { + return err != nonPoisonQueueErr + }) require.NoError(t, err) - msg := message.NewMessage("uuid", nil) + _, err = poisonQueue(func(msg *message.Message) (messages []*message.Message, e error) { + return nil, nonPoisonQueueErr + })(msg) - // if the publisher panics, we're in deep shit - better not catch that panic here - assert.Panics(t, func() { - poisonQueue.Middleware(handlerFuncAlwaysFailing)( - msg, - ) - }) + assert.Error(t, err) + require.Len(t, poisonPublisher.PopMessages(), 0) }