Spring Cloud Stream With Redis
Stream processing is a popular way to transform data in real time as it continuously flows into a system or database. By processing data as soon as it arrives, apps can perform immediate analyses or take pre-defined actions. Spring Cloud Stream is one of the most popular frameworks for building stream processing applications. And since Redis is a high-performance in-memory data store that can handle large volumes of data, it makes sense to use it as a message broker for Spring Cloud Stream.
However, Spring Cloud Stream does not have a native integration with Redis. Fortunately, Redisson PRO makes adding the Spring Cloud Stream binder to Java code easy, providing the implementation based on Reliable Queue object.
Spring Cloud Stream and Redis: The Perfect Pair for Stream Processing
Developers turn to Spring Cloud Stream because it provides a powerful yet easy-to-use API for building stream processing applications. Spring Cloud Stream is also known for its scalability, as it can handle extremely large volumes of data. It's also flexible and can be used with a variety of messaging systems.
While standard Redis queues provide basic FIFO functionality, Redisson's Reliable Queue adds the durability, acknowledgment mechanisms, and advanced features needed for enterprise-grade message processing. As applications grow, the pair of Redis and Spring Cloud Stream can be easily scaled horizontally. And since Redis can be configured to persist data to disk, everything is recovered when unforeseen outages occur.
Setting Up Spring Cloud Stream with Redis
Redisson PRO provides a simple implementation of Spring Cloud Stream Binder to use Redis as the message broker. (Spring Cloud Stream binder is a library that allows you to bind a Spring Cloud Stream app to a messaging system.) Here are the basic steps to take with Redisson PRO:
- Add the Spring Cloud Stream Binder library to your classpath.
- Define a bean for receiving messages. This bean should implement the Consumer interface.
- In the configuration file application.properties, define a channel ID to specify the channel on which the consumer bean should listen.
- Define a bean for publishing messages. This bean should implement the Supplier interface.
- In the configuration file application.properties, define a channel ID to specify the channel to which the supplier bean should publish messages.
For example, here's how to add the binder library in a classpath using Maven:
<dependency>
<groupId>pro.redisson</groupId>
<artifactId>spring-cloud-stream-binder-redisson</artifactId>
<version>3.46.0</version>
</dependency>
Using Gradle:
compile 'pro.redisson:spring-cloud-stream-binder-redisson:3.46.0'
Available consumer settings:
pollBatchSize
- Sets the maximum number of messages to retrieve in a single poll operation. Default value is 10.visibilityTimeout
- Sets the visibility timeout for retrieved messages. Default value is 30 seconds.negativeAcknowledgeDelay
- Specifies the delay duration before a message handled with an exception is eligible for redelivery. Default value is 15 seconds.
And here's some sample code for registering the input binder for receiving messages:
@Bean
public Consumer receiveMessage() {
return obj -> {
// consume received object
};
}
You define the channel ID in the configuration file application.properties
. To use the example receiveMessage
bean as defined above, connected to the channel called my-channel
:
spring.cloud.stream.bindings.receiveMessage-in-0.destination=my-channel
spring.cloud.stream.redisson.bindings.receiveMessage-in-0.consumer.pollBatchSize=15
Now register the output binder for publishing messages as follows:
@Bean
public Supplier feedSupplier() {
return () -> {
// ...
return new MyObject();
};
}
Again, define the channel ID in the configuration file application.properties
. In this example for the feedSupplier
bean defined above connected to the my-channel
channel:
spring.cloud.stream.bindings.feedSupplier-out-0.destination=my-channel
spring.cloud.stream.bindings.feedSupplier-out-0.producer.useNativeEncoding=true
Learn more about Spring Cloud Stream and Redis integration in the documentation.