Pragmatic architecture 4: CQRS and Messaging

Dev Diary

In the previous post we took a look on how to create objects from requests, validate them and if they are valid pass them as arguments directly on to a controller action as an argument. Now we want to really process the request or in our case the command or query object.
If you are not yet familiar with the Symfony Messenger component and the way it can be used for CQRS have a look at their documentation here and here. In this article we will just go briefly over the way we configured it.


What we want to have is the possibility to put commands and queries (and later on also events) on a message bus and let it be handled by a handler automatically. We also want to have access to the busses e.g. in controllers like here:

Autor
Michael Zangerle
Datum
4. März 2021
Lesedauer
4 Minuten
<?php namespace App\Customer\Controller; // ... final class CustomerController extends AbstractController { use ControllerResponseTrait; private MessageBusInterface $queryBus; private MessageBusInterface $commandBus; public function __construct( MessageBusInterface $queryBus, MessageBusInterface $commandBus, SerializerInterface $serializer ) { $this->queryBus = $queryBus; $this->commandBus = $commandBus; $this->serializer = $serializer; } //...

Adding the messenger component

So let's get to the configuration. We configure the three busses (command-, query- and event-bus) and enable the doctrine transaction middleware for the command bus (that’s not needed for the example app, but for a real-world example it’s quite useful). This middleware will relieve us from calling Doctrine’s flush() somewhere in the code and will call it at the end of the command processing and rollback if an error occurs.

Failed

We also configure the failed transport for failed messages which basically tells the messenger component what to do with failed messages. In this case it will be stored in a queue named failed in the database.

Sync

We have one transport called sync configured which is a synchronous transport and results in a synchronous execution of commands, queries and events directed to this transport.

Async

We also have an async transport which processes messages asynchronously. This is for example very useful if you want to send mails as a result of certain action but also want to return an immediate response to the customer and not wait for the mail actually being sent.

We define an async transport, configure it's retry strategy and define a queue name. Auto setup is enabled as we don’t use doctrine migrations in the example project but you might want to disable this in a real application and have those database changes reflected in the migrations instead.

To handle a specific message async we added a new interface called AsyncMessageInterface. All messages implementing this interface will be directed to the async transport and therefore also processed async (this is configured here).

Important: Just be careful and do not put multiple things that could fail in one async message handler. For example if two mails should be sent as a result of a specific action don't send both in one event handler. Because if the second mail fails to be sent, the message will be rescheduled and the first mail will be sent again before the second one will be tried.

framework: messenger: failure_transport: failed transports: async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' retry_strategy: max_retries: 3 delay: 1000 multiplier: 10 max_delay: 0 options: queue_name: "async" auto_setup: true # part of migrations in real app failed: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: queue_name: 'failed' auto_setup: true # part of migrations in real app sync: 'sync://' routing: 'App\Message\AsyncMessageInterface': async default_bus: command.bus buses: command.bus: middleware: - doctrine_transaction query.bus: ~ event.bus: ~

Handlers

Handlers might look very different depending on the rest of your application. They might call domain services or work with you domain models directly. The latter has the benefit of one delegation less but it really depends on your needs and the complexity of your application. In the case of our example application it looks like this:

<?php namespace App\Customer\Message\CommandHandler; // .. final class ActivateCustomerCommandHandler implements MessageHandlerInterface { // .. public function __construct( CustomerRepositoryInterface $repository, MessageBusInterface $eventBus ) { $this->repository = $repository; $this->eventBus = $eventBus; } public function __invoke(ActivateCustomerCommand $command): CustomerResponse { $customer = $this->repository->findById($command->getId()); if (!$customer) { throw new \RuntimeException('Customer with id '.$command->getId().' not found!'); } $customer->activate(); $event = new CustomerActivatedEvent($customer->getId()); $this->eventBus->dispatch($event)->with(new DispatchAfterCurrentBusStamp()); return new CustomerResponse($customer); } }

Nothing special is happening here. If the customer is found, the activation is processed. After that an event is created and dispatched (we will get to this in a second). If the customer is not found an exception is thrown. You might want to add some exception handling for a proper response to the consumer of the API.

As you can see in the example above, a response is returned to make our life a bit easier and it prevents us from having the client polling for a result. But it also works without response and polling if that fits your needs better or you want to stick more to CQRS. 
Those responses are plain php objects like the CustomerResponse which wraps the model and get their public methods serialized for the JSON response. This makes it very easy to create response objects which can be refactored easily and are specific for the request if needed. Also they can be used to generate the api documentation later on and for bigger projects they are way easier to handle than multiple serialization groups in one and the same class.

Our customers API provides a get request for the collection of all customers and we also have some filtering query parameters (first name, last name) which can be applied. To show this information also in the documentation we have to add the parameter annotation and define which object should be used for the generation.

<?php namespace App\Customer\Message\Response; use App\Entity\Customer; final class CustomerResponse { private Customer $entity; public function __construct(Customer $entity) { $this->entity = $entity; } public function getId(): int { return $this->entity->getId(); } public function getFirstName(): string { return $this->entity->getFirstName(); } // ... }

As mentioned above in this example we don’t only want to activate a customer but also notify them about their activation. Therefore an event is created and put on the eventbus as well. 

$event = new CustomerActivatedEvent($customer->getId()); $this->eventBus->dispatch($event)->with(new DispatchAfterCurrentBusStamp());

The event will be handled after the command and async as it implements the AsyncMessageInterface. The command itself gets handled right after the event in sync and sends an email to the customer (or in our example it just logs some message).

Register the handlers

The handlers are registered automatically with Symfony’s default configuration for services. But you might want to exclude the queries, events, commands and responses in the services.yaml.

services: # ... App\: resource: '../src/' exclude: # ... - '../src/*/Message/Command/*' - '../src/*/Message/Query/*' - '../src/*/Message/Event/*' - '../src/*/Message/Response/*'

Next step

The other handlers work in a very similar way and this should also show how the same structure can be used everywhere. As we now have a fully working example and structure to work with, the only thing we are really missing is a documentation for the API. Let's see how this can be accomplished in the last part.

Mehr davon?

Pragmatic architecture 5_Documentation_B
Dev Diary
Pragmatic architecture 5: Documentation of the API
5. März 2021 | 8 Min.
Pragmatic architecture 3_argument-value-resolvers_B
Dev Diary
Pragmatic architecture 3: Argument value resolvers
3. März 2021 | 4 Min.

Kontaktformular

*Pflichtfeld
*Pflichtfeld
*Pflichtfeld
*Pflichtfeld

Wir schützen deine Daten

Wir bewahren deine persönlichen Daten sicher auf und geben sie nicht an Dritte weiter. Mehr dazu erfährst du in unseren Datenschutzbestimmungen.