Reactive Spring Cloud Stream in practice
SCS framework provides an abstraction layer for event-driven systems to communicate over asynchronous messages. It hides the underlying middlewares from the application so it can use unified programming model to implement services.
Some adventages
- Easy to change the underlying middlewares (without code modification)
- Highly scalable
- Fast message exchange
- Asynchronous communication
- Support of backpressure
- Easy component separation
In this article we are going to overview the main abilities of the SCS framework using Kotlin programming language. Our message broker middlewares will be Kafka and RabbitMQ. We are going to use MongoDB to persist data and TestContainers for component tests.
SCS supports the reactive programming model. It uses Reactor library which allows us to write asynchronous, non blocking, declarative code. Sometimes declarative code can be a drawback - especially when the pipeline contains many instructions - because the business logics and framework instructions are mixed. To get over this, we can use coroutines which helps us to write asynchronous, nonblocking application in the imperative way. At the end of this article we are going to compare of those two approaches.
SCS in brief
The model consists of four main parts.
- Middleware, like Kafka or RabbitMq.
- Vendor specific binders
- Internal pipelines
- Application logics
The application connects to the message broker middleware over a specific binder and the binder routes the messages to the application through the vendor-neutral pipelines.
The documentation provides detailed overview of the framework and the existing binders. Of course we can create our own implementation when it is necessary.
In the next couple of chapters we are going to dig into different consuming and producing techniques. For the sake of simplicity, the application will communicate itself as it is depicted by the diagram.
We use a REST endpoint to post JSON messages to the application which then publishes its towards Kafka. After that, the messages will be consumed from the Kafka topic by three different processors. Those will process the messages and forward them to another middlewares.
Configuration
At first we have to provide connection settings for the middlewares. In this example the whole environment will be running in containers on Docker using localhost. We can pass another settings like authentication through the application.yml as well.
Next we configure the Kafka binder settings. This will be applied by all bindings use this binder.
The default-binder property shows the binder which has to be used when multiple binder implementations are available on the classpath and no binder attached to a given binding. If classpath contains only one binder, that will be the default one and this property can be ommited.
The consumers and producers are going to use JSON serializer for value-, and String serializer for key serialization. Although in case of Kafka it is recommended to use Avro in real applications because it provides a contract between the consumer and the producer.
The next common setting is the function definition. It is a semicolon separated list which contains the name of the processor methods (or the value of the name attribute of the method’s @Bean annotation, when provided). The framework will bind those to the appropriate channels.
There are three types of Java interfaces we can use in bindings.
Producer
Producers are able to send messages over message system middlewares.
To do this, we have to create a processor method which returns with a high ordered function (a Supplier interface). The method name (or the name configured in the @Bean annotation) and the name we granted in function.definion property must be identical. SCS framework then calls the get method of the Supplier returned by the processor method to generate the stream and attaches it to the proper channel.
This returned stream is a reactive pipeline (Flux). It’s generic type parameter represents the type of the message. The message type can be our domain model class itself or we can wrap it into the Message class provided by the framework. In the latter case we can access the message headers.
The last step is binding the function to the appropriate channel and optionally provide other requisite options.
We bind the produceMessage function to Kafka. The framework will know how to connect to the message system from the former common spring.kafka settings. The first part of the binding name must match to the name definied in the function.definition. The full name format is fix, it builds up from three parts separated with dashes.
- Function name
- Message direction (in/out)
- Index
The produceMessage-out-0 means that the stream described by the first (and in the case of Supplier the only) generic parameter (Flux) of the productMessage method’s return value will be binded as an output. Later we are going to see examples where are more than one generic parameters. The channel is connecting to Kafka now using the topic definied by the destination property. The binder here is optional bacause we set kafka binder as default.
Finally we have finished the setup of the producer, now we are able to send messages over Kafka.
For this we create an unicast processor and use is’s emitNext method to put a message into the Flux. This is the stream returned from the Supplier’s get method. The unicastProcessor represents a data stream and the framework subscribes to this stream and after serialization it will publish all emmited messages toward the message broker.
Consumer
Creating a consumer is similar to creating a publisher.
In the case of consumer we must subscribe to the stream manually, because the accept method of the Consumer interface has not any return value so the framework won’t get reference to the stream. Alternatively we can use the Function interface as the return value of the consumeMessage instead of Consumer and returns with a Mono<Void> from that.
The properties are similar to the producer properties but here we have to use the in keyword to indicate that we want to create an incoming channel. We set the consumer group with the help of group property. This is used by Kafka to determine the offset from where it has to continue reading after restart.
Processor
Processors can forward the incoming messages towards output channels after processing it. In this example we will receive messages from Kafka and publish those to RabbitMQ.
We need to configure the general properties of RabbitMQ before, as we did it in the case of Kafka. The default message format is JSON so we do not need to specify it.
The processor’s return type is the Function interface which has two generic parameters. The first one is the input data (reactive stream in our case) and the second one is the output. In this example we will receive messages from Kafka represented by the Product class and after processing, produce its as an InventoryInfo towards RabbitMQ.
We have to bind both the input and the output streams to the corresponding channels. We already know how we can bind Kafka channels. RabbitMQ is similar, but brings in some special binding properties like the type of the exchange and the routing key. With the direct type the consumers will use the routing key to redirect the message from the given exchange towards the queue declared by the consumer. It is very useful in case of point-to-point communication.
Further exchange types are available on the official page.
The messages will arrive into the exchange named inventory.message.exchange. If there are some queues assigned to that exchange with the given routing key, then the messages will be forwarded to those queues.
Multiple Consumer - Multiple Producer
We have the opportunity to use multiple input and output streams using the Function interface. The interface has of course one input and one output parameter, so we have to wrap our streams into a TupleN class, where N is the number of the wrapped streams.
The following method returns with a Function which has two input streams and two output streams. The first input consumes from Kafka and the second one from RabbitMQ. Each receive different type of messages and produce data into different output channels after mapping the messages into the given types.
The settings of the channels are similar to the previous ones. We take adventages of the index numbers here. It marks the position of the specific stream in the Tuple. The first input parameter’s index is 0 and the second one’s is 1 and so on. Thus the full name of the Flux<ProductMessage> is multiInMultiOut-in-1 because it takes place as the second parameter of the input Tuple2.
We have to specify some custom properties to the RabbitMQ consumer too as we did it in the case of producer. The most important is the bindingRoutingKey. This would be the same we used with the enrichAndSendToRabbitMq output channel, so we can consume the messages send by that producer, because the messages will be routed from the exchange to the queue created by this consumer. The queue creation and binding to the exchange happens automatically. Of course we can modify this behaviour by properties.
Component test
To test our application we are going to use the TestContainers library, which helps us to setup our test infrastructure from Docker containers and remove those after all tests are finished. We are going to create a docker compose file contains all the necessary components…
… and pass it to TestContainers.
The example test send a message over Kafka and check if it is saved into the database. The messag system is asynchronous, so we use Awaitiliy here to wait until the message will be processed and saved or the predefinied timeout pass away, which case the test will fail. This is just a simple example where we assert the save into the empty database. Of course we should create more proper tests in real appliations.
Coroutines and Reactive Streams
Given the following example where a Flux represents a reactive data stream.
Suppose we want to create Car objects from Vehicles and save those into the database, then get the type to each of them from a web service call and create the final model for publishing it over the message system.
We are going to create it in declarative way with Reactor and with coroutines as well and at last we will compare those two approaches.
The full example available here.
Reactor
Coroutine
The mono is a coroutine builder which starts the coroutine and wrap it into a Mono object to interoperate with Reactor. The advantage of combining the two framework is to keep all benefits (like exception handling, retry …) of the Reactor framework while we can easily separate the business logics from the framework functionality so we can write cleaner code.
The source code is available on Github