Publish/subscribe
Reliable Pub/Sub¶
This feature is available only in Redisson PRO edition.
The Reliable PubSub Topic object is a specialized publish-subscribe implementation with FIFO messages ordering, built on top of Valkey or Redis that provides robust message processing and advanced topic management features.
Unlike regular publish/subscribe built into Valkey and Redis, this implementation ensures message delivery, provides acknowledgment mechanisms, message grouping, seek to position and many more. Moreover, Valkey and Redis persistence with synchronized replication significantly increases Pub/Sub reliability by maintaining multiple consistent copies of data across nodes, ensuring that messages remain available even during node failures or network disruptions.
Overview¶
The Reliable PubSub follows a topic-subscription-consumer model where:
- Topic stores and manages messages
- Subscriptions maintain independent offsets and track message consumption
- Consumers process messages within a subscription
The objects above are fully thread-safe.
Message flow example for two publishers send two messages A and B to a common Pub/Sub topic:
flowchart LR
%% Define nodes
P1["Publisher 1"]:::publisherClass
P2["Publisher 2"]:::publisherClass
subgraph SG1 ["Redisson"]
RF("Topic"):::queueClass
RQ1("Subscription 1"):::queueClass
RQ2("Subscription 2"):::queueClass
end
C1["Consumer 1"]:::consumerClass
C2["Consumer 2"]:::consumerClass
C3["Consumer 3"]:::consumerClass
%% Define connections with message labels
P1 -->|A| RF
P2 -->|B| RF
RF -->|A, B| RQ1
RF -->|A, B| RQ2
RQ1 -->|A| C1
RQ1 -->|B| C2
RQ2 -->|A, B| C3
%% Define styles with different colors
classDef fanoutClass fill:#29B6F6,stroke:#333,stroke-width:2px
classDef filterClass fill:#29B6F6,stroke:#333,stroke-width:1px,stroke-dasharray:3
classDef queueClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef publisherClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef consumerClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef redissonApiStyle fill:#F5F5F5,stroke:#333,stroke-width:2px
class SG1 redissonApiStyle
linkStyle default stroke-width:2px
Pub/Sub patterns¶
Reliable PubSub allows to implement the patterns below.
Many-to-one pattern
Multiple publishers send messages to a single topic, which are then consumed by one subscriber through a single subscription. This pattern is useful for:
- Message aggregation from multiple sources
- Collecting logs or events from distributed systems
- Centralizing data processing from multiple producers
flowchart LR
%% Define nodes
P1["Publisher"]:::publisherClass
P2["Publisher"]:::publisherClass
subgraph SG1 ["Redisson"]
RF("Topic"):::queueClass
RQ1("Subscription"):::queueClass
end
C1["Consumer"]:::consumerClass
%% Define connections with message labels
P1 -->|A| RF
P2 -->|B| RF
RF --> RQ1
RQ1 -->|A, B| C1
%% Define styles with different colors
classDef fanoutClass fill:#29B6F6,stroke:#333,stroke-width:2px
classDef filterClass fill:#29B6F6,stroke:#333,stroke-width:1px,stroke-dasharray:3
classDef queueClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef publisherClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef consumerClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef redissonApiStyle fill:#F5F5F5,stroke:#333,stroke-width:2px
class SG1 redissonApiStyle
linkStyle default stroke-width:2px
Many-to-many pattern
Multiple publishers send messages to a single topic and subscription, but the messages are distributed across multiple consumers. This pattern is ideal for:
- Load balancing across multiple workers
- Parallel processing of messages
- Scaling consumption capacity horizontally
flowchart LR
%% Define nodes
P1["Publisher"]:::publisherClass
P2["Publisher"]:::publisherClass
subgraph SG1 ["Redisson"]
RF("Topic"):::queueClass
RQ1("Subscription"):::queueClass
end
C1["Consumer"]:::consumerClass
C2["Consumer"]:::consumerClass
C3["Consumer"]:::consumerClass
%% Define connections with message labels
P1 -->|A, B| RF
P2 -->|C| RF
RF --> RQ1
RQ1 -->|A| C1
RQ1 -->|B| C2
RQ1 -->|C| C3
%% Define styles with different colors
classDef fanoutClass fill:#29B6F6,stroke:#333,stroke-width:2px
classDef filterClass fill:#29B6F6,stroke:#333,stroke-width:1px,stroke-dasharray:3
classDef queueClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef publisherClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef consumerClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef redissonApiStyle fill:#F5F5F5,stroke:#333,stroke-width:2px
class SG1 redissonApiStyle
linkStyle default stroke-width:2px
One-to-many pattern
A single publisher sends messages to a topic, which then fans out to multiple subscriptions, each feeding a different consumer. This pattern supports:
- Broadcasting messages to multiple independent consumers
- Fan-out scenarios where each consumer needs all messages
- Creating multiple processing pipelines from one source
flowchart LR
%% Define nodes
P1["Publisher"]:::publisherClass
subgraph SG1 ["Redisson"]
RF("Topic"):::queueClass
RQ1("Subscription"):::queueClass
RQ2("Subscription"):::queueClass
end
C1["Consumer"]:::consumerClass
C2["Consumer"]:::consumerClass
%% Define connections with message labels
P1 -->|A, B, C| RF
RF --> RQ1
RF --> RQ2
RQ1 -->|A, B, C| C1
RQ2 -->|A, B, C| C2
%% Define styles with different colors
classDef fanoutClass fill:#29B6F6,stroke:#333,stroke-width:2px
classDef filterClass fill:#29B6F6,stroke:#333,stroke-width:1px,stroke-dasharray:3
classDef queueClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef publisherClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef consumerClass fill:#FFFFFF,stroke:#333,stroke-width:2px
classDef redissonApiStyle fill:#F5F5F5,stroke:#333,stroke-width:2px
class SG1 redissonApiStyle
linkStyle default stroke-width:2px
Features¶
-
Message Ordering & Processing
- FIFO Message Ordering: Messages delivered in first-in, first-out order
- Bulk Operations: Process multiple messages in a single operation for improved throughput
- Message Grouping: Sequential processing guarantees for messages with the same group ID
-
Consumer Models
- Pull and Push: Flexible message consumption patterns
- Subscription Seek: Per-subscription seek operation for message replay or offset adjustment
- Multiple Retention Modes: Control message retention based on subscription state and processing status
-
Topic & Message Limits
- Topic Size Limit: Set maximum capacity to prevent unbounded growth
- Message Size Limit: Restrict message size to maintain performance
-
Message Settings
- Message Expiration Timeout: Automatic removal of messages after a specified duration
- Message Visibility Timeout: Configurable time period during which a message is invisible to other consumers after being retrieved
- Message Priority: Assign importance levels (0-9) to messages for prioritized processing
- Message Delay: Schedule messages for future delivery with configurable delay periods
- Message Delivery Limit: Maximum number of delivery attempts before moving to Dead Letter Topic
- Message Headers: Attach key-value metadata to messages
-
Acknowledgment & Delivery
- Message Acknowledgment: Confirm successful processing with explicit acknowledgments
- Negative Acknowledgment: Redeliver failed messages or move rejected messages to Dead Letter Topic
- Automatic Redelivery: Unacknowledged messages are automatically redelivered after visibility timeout
- Dead Letter Topic (DLT): If a message reached delivery limit or rejected it's transferred to a Dead Letter Topic (DLT). It's a separate instance of Reliable PubSub topic which stores messages for later analysis or reprocessing.
-
Reliability & Data Integrity
- Durability and Synchronous Replication: Data redundancy across Valkey or Redis nodes with forced synchronous replication per operation to prevent message loss during node failures.
- Message Deduplication: Prevent duplicate processing by ID or payload hash within a configurable time interval
- Atomic Operations: All PubSub operations are executed atomically. This ensures data consistency and prevents race conditions.
-
Monitoring & Control
- Event Listeners: Bind listeners to topic events for real-time notifications
- Operation Control: Selectively disable or enable topic operations for maintenance
-
Architecture
- No Periodic Tasks: The PubSub operates without relying on periodic background tasks, reducing system overhead and improving reliability.
- No Message Broker: Operates directly on Valkey or Redis without requiring a separate message-broker component and avoiding extra points of failure.
Configuration¶
Topic¶
Topic settings can be changed at runtime. All settings are optional. If not set, default values are used.
visibility- Sets the duration for which a message becomes invisible to other consumers after being polled. This prevents multiple consumers from processing the same message simultaneously. Can be overridden in a subscription, when pooling a message or defining push listener. Default value is30 seconds.timeToLive- Sets the time-to-live duration for messages in the topic. Messages will be automatically removed after this duration expires.0value means expiration is not applied. Can be overridden when publishing a message. Default value is0.maxMessageSize- Sets the maximum allowed size (in bytes) for a single message. Messages exceeding this size will be rejected.0value means size limit is not applied. Default value is0.delay- Sets the delay duration before a message becomes available for consumption after being added.0value means delay is not applied. Can be overridden when publishing a message. Default value is0.maxSize- Sets the maximum number of messages that can be stored in the topic. When reached, publish operations may be blocked or return empty result.0value means no limit. Default value is0.deliveryLimit- Defines the maximum number of delivery attempts for a message. Once reached, the message may be moved to a dead letter topic if configured, otherwise deleted. Can be overridden in a subscription or when publishing a message. Default value is10.retentionMode- Defines the retention behavior for messages based on subscription state and processing status. Default value isSUBSCRIPTION_OPTIONAL_RETAIN_ALL.SUBSCRIPTION_REQUIRED_DELETE_PROCESSED- Requires at least one subscriber to store messages. Messages are discarded when all subscriptions have acknowledged, reached redelivery limit, or negatively acknowledged them.SUBSCRIPTION_REQUIRED_RETAIN_ALL- Requires at least one subscriber to store messages. Messages are not discarded after processing.SUBSCRIPTION_OPTIONAL_RETAIN_ALL- Default mode. Subscribers not required. Messages are always stored regardless of subscription state.
Code example of the reliable pubsub topic config definition:
RReliablePubSubTopic<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
// or instance with a custom codec defined
RReliablePubSubTopic<MyObject> topic = redisson.getReliablePubSubTopic("mytopic", new CustomCodec());
// overrides the previously set configuration
topic.setConfig(TopicConfig.defaults()
.deliveryLimit(4)
.visibility(Duration.ofSeconds(60))
.timeToLive(Duration.ofHours(24))
.maxSize(10000)
.retentionMode(RetentionMode.SUBSCRIPTION_REQUIRED_DELETE_PROCESSED));
// applies the configuration only if no configuration has been set previously
topic.setConfigIfAbsent(TopicConfig.defaults()
.deliveryLimit(4)
.visibility(Duration.ofSeconds(60)));
RReliablePubSubTopicAsync<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
// overrides the previously set configuration
RFuture<Void> sfr = topic.setConfigAsync(TopicConfig.defaults()
.deliveryLimit(4)
.visibility(Duration.ofSeconds(60)));
// applies the configuration only if no configuration has been set previously
RFuture<Boolean> rtsf = topic.setConfigIfAbsentAsync(TopicConfig.defaults()
.deliveryLimit(4)
.visibility(Duration.ofSeconds(60)));
Subscription¶
Subscriptions maintain independent offsets and track message consumption independently of other subscriptions on the same topic. Each subscription can have multiple pull or push consumers that share the workload.
name- Explicitly set subscription name.generatedName- Auto-generated subscription name.deadLetterTopicName- Name of the Dead Letter Topic for messages that reached delivery limit or were rejected. Can be removed by settingnull. Default value isnull.deliveryLimit- Maximum number of delivery attempts for a message. Once reached, message may be moved to dead letter topic. Can be overridden when publishing a message. Default value is10.visibility- Duration for which a message becomes invisible after being polled. Can be overridden when pooling a message or defining push listener. Default value is30 seconds.position- Initial position for message consumption. Default isPosition.latest().retainAfterAck- When enabled, messages are retained after acknowledgment.
Code example of subscription creation:
RReliablePubSubTopic<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
// create subscription with auto-generated name
Subscription<MyObject> sub1 = topic.createSubscription();
// create subscription with specific name
Subscription<MyObject> sub2 = topic.createSubscription(
SubscriptionConfig.name("my-subscription")
.deadLetterTopicName("mytopic-dlt")
.deliveryLimit(5)
.visibility(Duration.ofMinutes(2))
.position(Position.earliest()));
// get existing subscription
Subscription<MyObject> existing = topic.getSubscription("my-subscription");
// check if subscription exists
boolean exists = topic.hasSubscription("my-subscription");
// get all subscription names
Set<String> names = topic.getSubscriptions();
// remove subscription (also removes all consumers)
boolean removed = topic.removeSubscription("my-subscription");
RReliablePubSubTopicAsync<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
RFuture<Subscription<MyObject>> subFuture = topic.createSubscriptionAsync(
SubscriptionConfig.name("my-subscription")
.deadLetterTopicName("mytopic-dlt")
.deliveryLimit(5));
RFuture<Boolean> existsFuture = topic.hasSubscriptionAsync("my-subscription");
RFuture<Set<String>> namesFuture = topic.getSubscriptionsAsync();
RFuture<Boolean> removeFuture = topic.removeSubscriptionAsync("my-subscription");
Consumer¶
Consumers process messages from a subscription. Two types are available:
- Pull Consumer: On-demand message retrieval with manual control
- Push Consumer: Event-driven message processing via registered listeners
Each consumer allows to specify settings below.
name- Explicitly set consumer name.generatedName- Auto-generated consumer name.groupIdClaimTimeout- Timeout for reassigning message group ID ownership to a new consumer.
Code example of consumer creation:
Subscription<MyObject> subscription = topic.getSubscription("my-subscription");
// create pull consumer with auto-generated name
PullConsumer<MyObject> pullConsumer1 = subscription.createPullConsumer();
// create pull consumer with specific name and claim timeout
PullConsumer<MyObject> pullConsumer2 = subscription.createPullConsumer(
ConsumerConfig.name("my-pull-consumer")
.groupIdClaimTimeout(Duration.ofMinutes(5)));
// create push consumer
PushConsumer<MyObject> pushConsumer = subscription.createPushConsumer(
ConsumerConfig.name("my-push-consumer"));
// get all consumer names
Set<String> consumerNames = subscription.getConsumerNames();
// check if consumer exists
boolean exists = subscription.hasConsumer("my-pull-consumer");
// remove consumer
boolean removed = subscription.removeConsumer("my-pull-consumer");
Publishing Messages¶
Messages are published to the topic via publish() (single message) and publishMany() (batch of messages) methods. They become available for consumption upon return from the method.
Message-Level Settings
payload- The data to include in the message. Required setting.deliveryLimit- Maximum delivery attempts. Minimum value is1. Default uses topic's setting or10.timeToLive- Message expiration timeout.0means no expiration. Default uses topic's setting.header/headers- Attach key-value metadata to messages.delay- Schedule message for future delivery.0means immediate delivery.priority- Priority level from0(lowest) to9(highest). Default is0.groupId- Group ID for sequential processing by the same consumer.deduplicationById- Enable deduplication by custom ID for specified interval.deduplicationByHash- Enable deduplication by payload hash for specified interval.
Publish Arguments Settings
timeout- Maximum time to wait when publishing to a full topic with limited size.headersCodec- Codec for values of message headers serialization.
Code example of messages publishing:
RReliablePubSubTopic<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
MyObject data = new MyObject();
// publish single message with full options
Message<MyObject> msg = topic.publish(PublishArgs.messages(MessageArgs.payload(data)
.deliveryLimit(10)
.timeToLive(Duration.ofDays(1))
.header("type", "order")
.header("priority", "high")
.delay(Duration.ofSeconds(30))
.priority(5)
.groupId("customer-123")
.deduplicationById("order-456", Duration.ofHours(1))));
String id = msg.getId();
MyObject payload = msg.getPayload();
Map<String, Object> headers = msg.getHeaders();
// publish multiple messages in a batch
List<Message<MyObject>> msgs = topic.publishMany(PublishArgs.messages(
MessageArgs.payload(data1),
MessageArgs.payload(data2),
MessageArgs.payload(data3)));
RReliablePubSubTopicAsync<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
RFuture<Message<MyObject>> msgFuture = topic.publishAsync(
PublishArgs.messages(MessageArgs.payload(data)
.priority(3)
.delay(Duration.ofMinutes(5))));
RFuture<List<Message<MyObject>>> batchFuture = topic.publishManyAsync(
PublishArgs.messages(MessageArgs.payload(data1), MessageArgs.payload(data2)));
Publishing to a Bounded Topic
When the topic has a maximum size (maxSize > 0), publish operations will block if the topic is full and timeout is defined:
RReliablePubSubTopic<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
// publish with timeout - waits up to 2 minutes for space
Message<MyObject> msg = topic.publish(PublishArgs.messages(MessageArgs.payload(data))
.timeout(Duration.ofMinutes(2)));
// returns null if timeout elapsed without space becoming available
Receive messages¶
Pull Consumer¶
Pull consumers retrieve messages on-demand via pull() (single message) and pullMany() (batch of messages) methods, providing manual control over message consumption rate and timing.
Pull Arguments Settings
acknowledgeMode- How messages are acknowledged after retrieval:AUTO- Messages automatically acknowledged after deliveryMANUAL- Messages must be explicitly acknowledged. Default value.
visibility- Duration message remains invisible after retrieval. Default uses subscription's setting or30 seconds.timeout- Maximum time to wait for messages.0means wait indefinitely. Not set by default (non-blocking).count- Maximum number of messages to retrieve. Default is1.headersCodec- Codec for values of message headers serialization.
Short Polling (Non-blocking)
Returns immediately with or without messages:
Subscription<MyObject> sub = topic.getSubscription("my-subscription");
PullConsumer<MyObject> consumer = sub.createPullConsumer();
// poll single message
Message<MyObject> msg = consumer.pull();
// poll with custom visibility
Message<MyObject> msg = consumer.pull(PullArgs.defaults()
.visibility(Duration.ofSeconds(60))
.acknowledgeMode(AcknowledgeMode.MANUAL));
// poll multiple messages
List<Message<MyObject>> msgs = consumer.pullMany(PullArgs.defaults()
.count(10)
.acknowledgeMode(AcknowledgeMode.AUTO));
Long Polling (Blocking)
Waits for messages to arrive:
PullConsumer<MyObject> consumer = subscription.createPullConsumer();
// wait up to 2 minutes for a message
Message<MyObject> msg = consumer.pull(PullArgs.defaults()
.timeout(Duration.ofMinutes(2))
.visibility(Duration.ofSeconds(30)));
// wait up to 1 minute for batch of messages
List<Message<MyObject>> msgs = consumer.pullMany(PullArgs.defaults()
.timeout(Duration.ofMinutes(1))
.count(50));
Push Consumer¶
Push consumers receive a single message per listener's onMessage() method invocation.
MessageListenerArgs Settings
listener- The message listener to register. Required.acknowledgeMode- How messages are acknowledged after retrieval:AUTO- Messages automatically acknowledged after deliveryMANUAL- Messages must be explicitly acknowledged. Default value.
visibility- Duration message remains invisible. Default uses subscription's setting.headersCodec- Codec for message headers deserialization.
Code example:
Subscription<MyObject> sub = topic.getSubscription("my-subscription");
PushConsumer<MyObject> consumer = sub.createPushConsumer();
// register listener with manual acknowledgment
consumer.registerListener(MessageListenerArgs.listener((message, acknowledgment) -> {
try {
// process message
MyObject data = message.getPayload();
String id = message.getId();
Map<String, Object> headers = message.getHeaders();
// ... business logic ...
// acknowledge successful processing
acknowledgment.acknowledge(MessageAckArgs.ids(id));
} catch (Exception e) {
// mark as failed for redelivery
acknowledgment.negativeAcknowledge(
MessageNegativeAckArgs.failed(message.getId())
.delay(Duration.ofSeconds(30)));
}
})
.acknowledgeMode(AcknowledgeMode.MANUAL)
.visibility(Duration.ofMinutes(1)));
// register listener with auto acknowledgment
consumer.registerListener(MessageListenerArgs.listener((message, acknowledgment) -> {
// process message - automatically acknowledged after method returns
processMessage(message.getPayload());
})
.acknowledgeMode(AcknowledgeMode.AUTO));
Message Grouping¶
Messages with the same groupId are guaranteed to be processed by the same consumer, ensuring sequential processing order for related messages. Order of redelivered messages with the same groupId isn't guaranteed.
Code example of message publishing with the same group:
RReliablePubSubTopic<OrderEvent> topic = redisson.getReliablePubSubTopic("orders");
// publish messages with group ID - all events for same customer go to same consumer
topic.publish(PublishArgs.messages(
MessageArgs.payload(new OrderEvent("order-1", "created"))
.groupId("group-123")));
topic.publish(PublishArgs.messages(
MessageArgs.payload(new OrderEvent("order-1", "paid"))
.groupId("group-123")));
topic.publish(PublishArgs.messages(
MessageArgs.payload(new OrderEvent("order-1", "shipped"))
.groupId("group-123")));
// all three messages will be processed by the same consumer in order
Configure consumer with groupIdClaimTimeout to handle stalled consumers:
PullConsumer<OrderEvent> consumer = subscription.createPullConsumer(
ConsumerConfig.name("order-processor")
.groupIdClaimTimeout(Duration.ofMinutes(5)));
Message group ownership is reassigned to a new consumer when both conditions are met:
- The current owner has not received the current pending message for this group id within the timeout period.
- The current owner has been inactive (no
acknowledge(),negativeAcknowledge(),pull(), or push listener invocations) for longer than this timeout.
Acknowledging Messages¶
Message acknowledgment is required for messages retrieved with AcknowledgeMode.MANUAL.
Positive Acknowledgment
Confirms successful processing. The message is deleted from the subscription:
PullConsumer<MyObject> consumer = subscription.createPullConsumer();
Message<MyObject> msg = consumer.pull(PullArgs.defaults()
.acknowledgeMode(AcknowledgeMode.MANUAL));
// process message...
// acknowledge single message
consumer.acknowledge(MessageAckArgs.ids(msg.getId()));
// acknowledge multiple messages
List<Message<MyObject>> msgs = consumer.pullMany(PullArgs.defaults().count(10));
String[] ids = msgs.stream().map(Message::getId).toArray(String[]::new);
consumer.acknowledge(MessageAckArgs.ids(ids));
Negative Acknowledgment
Explicitly marks messages as failed or rejected:
Failed- Message is redelivered. Optionally specify delay before redelivery.Rejected- Message is removed and moved to Dead Letter Topic if configured.
PullConsumer<MyObject> consumer = subscription.createPullConsumer();
Message<MyObject> msg = consumer.pull();
try {
// attempt processing...
} catch (TemporaryException e) {
// mark as failed with 30 second delay before redelivery
consumer.negativeAcknowledge(
MessageNegativeAckArgs.failed(msg.getId())
.delay(Duration.ofSeconds(30)));
} catch (PermanentException e) {
// mark as rejected - moves to DLT if configured
consumer.negativeAcknowledge(
MessageNegativeAckArgs.rejected(msg.getId()));
}
// batch negative acknowledgment
String[] failedIds = getFailedIds();
consumer.negativeAcknowledge(
MessageNegativeAckArgs.failed(failedIds)
.delay(Duration.ofMinutes(1)));
String[] rejectedIds = getRejectedIds();
consumer.negativeAcknowledge(
MessageNegativeAckArgs.rejected(rejectedIds));
Seek and Replay¶
Subscriptions maintain an independent offset that tracks the current position in the message stream. The seek operation allows repositioning this offset for message replay from a specific point or skipping ahead to newer messages.
Code usage example:
Subscription<MyObject> sub = topic.getSubscription("my-subscription");
// seek to latest (newest) messages
sub.seek(Position.latest());
// seek to earliest (oldest) available messages
sub.seek(Position.earliest());
// seek to specific message ID (inclusive)
sub.seek(Position.messageId("0a0c0127ddec23ff3496960478ae3304"));
// seek to after specific message ID (exclusive)
sub.seek(Position.messageIdExclusive("0a0c0127ddec23ff3496960478ae3304"));
// seek to specific timestamp (inclusive)
sub.seek(Position.timestamp(Instant.parse("2024-01-15T10:00:00Z")));
// seek to after specific timestamp (exclusive)
sub.seek(Position.timestampExclusive(Instant.parse("2024-01-15T10:00:00Z")));
Dead Letter Topic (DLT)¶
A Dead Letter Topic stores messages that cannot be processed or delivered due to errors. Instead of losing or endlessly retrying problematic messages, they are moved to the DLT for later inspection and handling.
Messages are routed to a DLT when:
* Message delivery attempts exceed the maximum delivery limit
* Message was negatively acknowledged with rejected status
The DLT is a separate RReliablePubSubTopic instance with the same capabilities.
Code example of DLT definition:
// configure subscription with dead letter topic
RReliablePubSubTopic<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
Subscription<MyObject> sub = topic.createSubscription(
SubscriptionConfig.name("my-subscription")
.deadLetterTopicName("mytopic-dlt")
.deliveryLimit(3));
// access the dead letter topic
RReliablePubSubTopic<MyObject> dlt = redisson.getReliablePubSubTopic("mytopic-dlt");
// list source topics using this as DLT
Set<String> sources = dlt.getDeadLetterTopicSources();
// returns: ["mytopic"]
// process failed messages from DLT
Subscription<MyObject> dltSub = dlt.createSubscription();
PullConsumer<MyObject> dltConsumer = dltSub.createPullConsumer();
List<Message<MyObject>> failedMsgs = dltConsumer.pullMany(PullArgs.defaults().count(100));
for (Message<MyObject> msg : failedMsgs) {
// analyze or reprocess failed messages
analyzeFailedMessage(msg);
dltConsumer.acknowledge(MessageAckArgs.ids(msg.getId()));
}
Management and Monitoring¶
Messages can be inspected or managed as needed:
size()- Returns total number of messages ready for polling (excludes delayed and unacknowledged)isEmpty()- Checks if topic is emptyclear()- Removes all messages from the topiccontains()- Checks if message with specified ID existscontainsMany()- Returns count of matching messagesget()- Returns message by IDgetAll()- Returns messages by IDslistAll()- Returns all messages ready for polling
Operations Control
Operations can be disabled or enabled for maintenance:
disableOperation(PubSubOperation.PUBLISH)- Prevents new messages while processing existing onesdisableOperation(PubSubOperation.PULL)- Allows publishing while preventing consumptionenableOperation()- Re-enables a disabled operation
Code example of topic inspection:
RReliablePubSubTopic<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
// get topic size
int size = topic.size();
boolean empty = topic.isEmpty();
// check for specific messages
boolean exists = topic.contains("msg-id-123");
int count = topic.containsMany("id1", "id2", "id3");
// retrieve messages
Message<MyObject> msg = topic.get("msg-id-123");
List<Message<MyObject>> msgs = topic.getAll("id1", "id2", "id3");
// list all ready messages
List<Message<MyObject>> allMsgs = topic.listAll();
// clear all messages
boolean cleared = topic.clear();
// disable publishing for maintenance
topic.disableOperation(PubSubOperation.PUBLISH);
// re-enable
topic.enableOperation(PubSubOperation.PUBLISH);
Listeners¶
Redisson provides reliable event listener binding for RReliablePubSubTopic objects. Listeners receive messages sent during connection interruptions and failovers.
Topic-Level Listeners
| Listener Interface | Event Description |
|---|---|
PublishedEventListener |
Messages added to topic |
TopicConfigEventListener |
Topic config is set |
DisabledOperationEventListener |
Operation switched to disabled state |
EnabledOperationEventListener |
Operation switched to enabled state |
TopicFullEventListener |
Topic is full |
Consumer-Level Listeners (via subscription)
| Listener Interface | Event Description |
|---|---|
AcknowledgedEventListener |
Messages acknowledged |
NegativelyAcknowledgedEventListener |
Messages negatively acknowledged |
PulledEventListener |
Messages polled from subscription |
Code example of listeners usage:
RReliablePubSubTopic<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
// published event listener
String publishedListenerId = topic.addListener(new PublishedEventListener() {
@Override
public void onPublished(List<String> ids) {
System.out.println("Messages published: " + ids);
}
});
// topic full listener
String fullListenerId = topic.addListener(new TopicFullEventListener() {
@Override
public void onFull(String topicName) {
System.out.println("Topic is full: " + topicName);
}
});
// config change listener
String configListenerId = topic.addListener(new TopicConfigEventListener() {
@Override
public void onSet(String topicName) {
System.out.println("Config set for: " + topicName);
}
});
// operation state listeners
String disabledListenerId = topic.addListener(new DisabledOperationEventListener() {
@Override
public void onDisabled(String topicName, PubSubOperation operation) {
System.out.println("Operation disabled: " + operation);
}
});
String enabledListenerId = topic.addListener(new EnabledOperationEventListener() {
@Override
public void onEnabled(String topicName, PubSubOperation operation) {
System.out.println("Operation enabled: " + operation);
}
});
// remove listeners
topic.removeListener(publishedListenerId);
topic.removeListener(fullListenerId);
RReliablePubSubTopicAsync<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
RFuture<String> listenerFuture = topic.addListenerAsync(new PublishedEventListener() {
@Override
public void onPublished(List<String> ids) {
// handle published messages
}
});
RFuture<Void> removeFuture = topic.removeListenerAsync(listenerId);
Statistics¶
Reliable PubSub provides comprehensive statistics at topic, subscription, and consumer levels.
Topic¶
Topic statistics provide an overview of the topic's state and activity.
| TopicStatistics Method | Description |
|---|---|
getTopicName() |
Name of the topic |
getDelayedMessagesCount() |
Number of messages scheduled for future delivery |
getSubscriptionsCount() |
Number of subscriptions |
getPublishedMessagesCount() |
Total number of messages published to the topic |
RReliablePubSubTopic<MyObject> topic = redisson.getReliablePubSubTopic("mytopic");
TopicStatistics stats = topic.getStatistics();
String topicName = stats.getTopicName();
long delayedCount = stats.getDelayedMessagesCount();
long subscriptionsCount = stats.getSubscriptionsCount();
long publishedCount = stats.getPublishedMessagesCount();
Subscription¶
Subscription statistics track message processing progress and delivery patterns.
| SubscriptionStatistics Method | Description |
|---|---|
getSubscriptionName() |
Name of the subscription |
getConsumersCount() |
Number of active consumers |
getUnacknowledgedMessagesCount() |
Messages currently being processed (not yet acknowledged) |
getAcknowledgedMessagesCount() |
Total messages successfully processed |
getNegativelyAcknowledgedMessagesCount() |
Total messages marked as failed or rejected |
getRedeliveryAttemptsCount() |
Number of message redelivery attempts |
getDeadLetteredMessagesCount() |
Messages moved to the Dead Letter Topic |
Subscription<MyObject> sub = topic.getSubscription("my-subscription");
SubscriptionStatistics stats = sub.getStatistics();
String subscriptionName = stats.getSubscriptionName();
long consumersCount = stats.getConsumersCount();
long redeliveryAttempts = stats.getRedeliveryAttemptsCount();
long unackedCount = stats.getUnacknowledgedMessagesCount();
long ackedCount = stats.getAcknowledgedMessagesCount();
long nackedCount = stats.getNegativelyAcknowledgedMessagesCount();
long deadLetteredCount = stats.getDeadLetteredMessagesCount();
Consumer¶
Consumer statistics provide per-consumer metrics for workload distribution analysis.
| ConsumerStatistics Method | Description |
|---|---|
getConsumerName() |
Name of the consumer |
getUnacknowledgedMessagesCount() |
Messages retrieved but not yet acknowledged by this consumer |
getAcknowledgedMessagesCount() |
Total messages successfully acknowledged by this consumer |
getNegativelyAcknowledgedMessagesCount() |
Total messages negatively acknowledged by this consumer |
PullConsumer<MyObject> consumer = subscription.createPullConsumer();
ConsumerStatistics stats = consumer.getStatistics();
String consumerName = stats.getConsumerName();
long unackedCount = stats.getUnacknowledgedMessagesCount();
long ackedCount = stats.getAcknowledgedMessagesCount();
long nackedCount = stats.getNegativelyAcknowledgedMessagesCount();
Async
RFuture<TopicStatistics> topicStatsFuture = topic.getStatisticsAsync();
RFuture<SubscriptionStatistics> subStatsFuture = subscription.getStatisticsAsync();
RFuture<ConsumerStatistics> consumerStatsFuture = consumer.getStatisticsAsync();
Durability and Synchronous Replication¶
The synchronization mechanism allows PubSub modification operations to be propagated to replica nodes in a controlled manner, ensuring data consistency across Valkey or Redis cluster. Additionally, Valkey and Redis persistence options, such as append-only files (AOF), and RDB snapshots, significantly increase PubSub reliability by preventing data loss during server restarts or failures, allowing for recovery of queued messages that would otherwise be lost when using only in-memory storage.
Valkey and Redis use asynchronous replication. Reliable PubSub introduces synchronous replication modes per operation to address the limitations of asynchronous replication. This capability with proper configuration of Valkey and Redis persistence transforms storage from a purely volatile memory store into a more robust message broker suitable for applications where message delivery guarantees are critical. This is particularly important for mission-critical applications where data loss is unacceptable.
Each PubSub modification operation can be configured with specific synchronization parameters:
-
syncMode- Sets the synchronization mode to be used for current operation. Default value isAUTO.Three synchronization strategies are available:
AUTO- Ensures data durability by blocking until write operations are confirmed as persisted to the memory and the Append-Only File (AOF) on the primary Valkey or Redis node and replicas if the AOF persistence feature is enabled. If AOF persistence is unavailable, falls back to blocking until replica nodes acknowledge that write operations have been applied to memory. If neither durability mechanism is available, proceeds without synchronization guarantees.ACK- Ensures data durability by blocking until replica nodes acknowledge that write operations have been applied to memory.ACK_AOF- Ensures data durability by blocking until write operations are confirmed as persisted to the memory and the Append-Only File (AOF) on the primary Valkey or Redis node and replicas. Requires Redis 7.2.0+ or any Valkey version.
-
syncFailureMode- Sets the behavior when synchronization with replica nodes fails. Default value isLOG_WARNING.Two failure handling modes are available:
THROW_EXCEPTION- Throw an exception to the caller. This mode is useful in scenarios where synchronization failures should be immediately visible and handled by the application code.LOG_WARNING- Log a warning message but continue execution. This mode is suitable for non-critical synchronization operations where the application can continue functioning despite synchronization issues.
-
syncTimeout- Sets the timeout duration for synchronization of the current operation. Defines how long the caller will wait for acknowledgment from replica nodes. Default value is1 second.
Code examples of synchronization parameters usage:
RReliablePubSubTopic<MyObject> topic = redisson.getReliablePubSubTopic("test");
// Publishing
topic.publish(PublishArgs.messages(MessageArgs.payload(data))
.syncMode(SyncMode.ACK_AOF)
.syncTimeout(Duration.ofSeconds(15))
.syncFailureMode(SyncFailureMode.LOG_WARNING));
// Get subscription and consumer for pull/acknowledge operations
Subscription<MyObject> sub = topic.getSubscription("my-subscription");
PullConsumer<MyObject> consumer = sub.createPullConsumer();
// Pulling
Message<MyObject> msg = consumer.pull(PullArgs.defaults()
.syncMode(SyncMode.ACK)
.syncTimeout(Duration.ofSeconds(10))
.syncFailureMode(SyncFailureMode.THROW_EXCEPTION));
// Acknowledge
consumer.acknowledge(MessageAckArgs.ids(msg.getId())
.syncMode(SyncMode.AUTO)
.syncTimeout(Duration.ofSeconds(20)));
// Negative acknowledge
consumer.negativeAcknowledge(MessageNegativeAckArgs.rejected(msg.getId())
.syncMode(SyncMode.ACK)
.syncTimeout(Duration.ofSeconds(15)));
RReliableQueue<MyObject> rq = redisson.getReliableQueue("test");
RFuture<Message<MyObject>> arf = rq.addAsync(QueueAddArgs.messages(MessageArgs.payload(data))
.syncMode(SyncMode.ACK_AOF)
.syncTimeout(Duration.ofSeconds(15))
.syncFailureMode(SyncFailureMode.LOG_WARNING));
RFuture<Message<MyObject>> prf = rq.pollAsync(QueuePollArgs.defaults()
.syncMode(SyncMode.ACK)
.syncTimeout(Duration.ofSeconds(10))
.syncFailureMode(SyncFailureMode.THROW_EXCEPTION));
RFuture<Void> ack = rq.acknowledgeAsync(QueueAckArgs.ids(msg.getId())
.syncMode(SyncMode.AUTO)
.syncTimeout(Duration.ofSeconds(20)));
RFuture<Void> nack = rq.negativeAcknowledgeAsync(QueueNegativeAckArgs.rejected(msg.getId())
.syncMode(SyncMode.ACK)
.syncTimeout(Duration.ofSeconds(15)));
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<MyObject> rq = redisson.getReliableQueue("test");
Mono
Mono
Mono
Mono
```
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> rq = redisson.getReliableQueue("test");
Maybe<Message<MyObject>> arf = rq.add(QueueAddArgs.messages(MessageArgs.payload(data))
.syncMode(SyncMode.ACK_AOF)
.syncTimeout(Duration.ofSeconds(15))
.syncFailureMode(SyncFailureMode.LOG_WARNING));
Maybe<Message<MyObject>> prf = rq.poll(QueuePollArgs.defaults()
.syncMode(SyncMode.ACK)
.syncTimeout(Duration.ofSeconds(10))
.syncFailureMode(SyncFailureMode.THROW_EXCEPTION));
Completable ack = rq.acknowledge(QueueAckArgs.ids(msg.getId())
.syncMode(SyncMode.AUTO)
.syncTimeout(Duration.ofSeconds(20)));
Completable nack = rq.negativeAcknowledge(QueueNegativeAckArgs.rejected(msg.getId())
.syncMode(SyncMode.ACK)
.syncTimeout(Duration.ofSeconds(15)));
Topic¶
Java RTopic object implements Publish / Subscribe mechanism based on Redis Pub/Sub or Valkey Pub/Sub. It allows to subscribe on events published with multiple instances of RTopic object with the same name.
Listeners are re-subscribed automatically after reconnection or failover. All messages sent during absence of connection are lost.
Note
For applications requiring reliable delivery and advanced processing capabilities, consider using Reliable PubSub Topic.
Code example:
RTopic topic = redisson.getTopic("myTopic");
int listenerId = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RTopic topic = redisson.getTopic("myTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());
RTopicAsync topic = redisson.getTopic("myTopic");
RFuture<Integer> listenerFuture = topic.addListenerAsync(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RTopicAsync topic = redisson.getTopic("myTopic");
RFuture<Long> publishFuture = topic.publishAsync(new SomeObject());
RedissonReactiveClient redisson = redissonClient.reactive();
RTopicReactive topic = redisson.getTopic("myTopic");
Mono<Integer> listenerMono = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RTopicReactive topic = redisson.getTopic("myTopic");
Mono<Long> publishMono = topic.publish(new SomeObject());
RedissonRxClient redisson = redissonClient.rxJava();
RTopicRx topic = redisson.getTopic("myTopic");
Single<Integer> listenerMono = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RTopicRx topic = redisson.getTopic("myTopic");
Single<Long> publishMono = topic.publish(new SomeObject());
Partitioning¶
This feature is available only in Redisson PRO edition.
Although each Topic instance is cluster-compatible, it can be connected only to a single Redis or Valkey node which owns the topic name. That may cause the following issues:
- CPU overload on a single node.
- Overload of network or data traffic to a single node.
- Full interruption of the message flow during failover.
Topic partitioning addresses these challenges by enabling connections to all nodes in cluster and distributing messages effectively. It brings the following benefits:
- Increases throughput of the topic.
- Minimizes interruptions during failover.
- Lowers CPU and network load on Valkey or Redis nodes.
- Scales the message flow to multiple Valkey or Redis nodes.
Partitions amount is defined through the global topicSlots setting or per instance through ClusteredTopicOptions.slots() setting, which overrides the global setting.
Slots definition per instance:
RClusteredTopic topic = redisson.getClusteredTopic(ClusteredTopicOptions.name("myTopic").slots(15));
Usage example:
RClusteredTopic topic = redisson.getClusteredTopic("myTopic");
int listenerId = topic.addListener(MyObject.class, new MessageListener<MyObject>() {
@Override
public void onMessage(CharSequence channel, MyObject message) {
//...
}
});
// in other thread or JVM
RClusteredTopic topic = redisson.getClusteredTopic("myTopic");
long clientsReceivedMessage = topic.publish(new MyObject());
Topic pattern¶
Java implementation of Redis or Valkey based RPatternTopic object. It allows to subscribe to multiple topics by specified glob-style pattern.
Listeners are re-subscribed automatically after reconnection to a server or failover.
Pattern examples:
topic?subscribes totopic1,topicA...topic?_mysubscribes totopic_my,topic123_my,topicTEST_my...topic[ae]subscribes totopicaandtopiceonly
Code example:
// subscribe to all topics by `topic*` pattern
RPatternTopic patternTopic = redisson.getPatternTopic("topic*");
int listenerId = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, Message msg) {
//...
}
});
RedissonReactiveClient redisson = redissonClient.reactive();
RTopicReactive patternTopic = redisson.getPatternTopic("topic*");
Mono<Integer> listenerMono = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, Message msg) {
//...
}
});
RedissonRxClient redisson = redissonClient.rxJava();
RTopicRx patternTopic = redisson.getPatternTopic("topic*");
Single<Integer> listenerSingle = patternTopic.addListener(Message.class, new PatternMessageListener<Message>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, Message msg) {
//...
}
});
Sharded topic¶
Java implementation of Redis or Valkey based RShardedTopic object implements Sharded Publish / Subscribe mechanism. It allows to subscribe on events published with multiple instances of RShardedTopic object with the same name. Subscribe/publish operations are executed only on Redis or Valkey node in Cluster which is bounded to specific topic name. Published messages via RShardedTopic aren't broadcasted across all nodes as for RTopic object. Which reduces network bandwidth usage between Redis and Valkey nodes and their CPU load, as well as their CPU load.
Listeners are re-subscribed automatically after reconnection to a server or failover. All messages sent during absence of connection are lost. Use Reliable Topic for reliable delivery.
Code example:
RShardedTopic topic = redisson.getShardedTopic("myTopic");
int listenerId = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RShardedTopic topic = redisson.getShardedTopic("myTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());
RShardedTopicAsync topic = redisson.getShardedTopic("myTopic");
RFuture<Integer> listenerFuture = topic.addListenerAsync(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RShardedTopicAsync topic = redisson.getShardedTopic("myTopic");
RFuture<Long> publishFuture = topic.publishAsync(new SomeObject());
RedissonReactiveClient redisson = redissonClient.reactive();
RShardedTopicReactive topic = redisson.getShardedTopic("myTopic");
Mono<Integer> listenerMono = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RShardedTopicReactive topic = redisson.getShardedTopic("myTopic");
Mono<Long> publishMono = topic.publish(new SomeObject());
RedissonRxClient redisson = redissonClient.rxJava();
RShardedTopicRx topic = redisson.getShardedTopic("myTopic");
Single<Integer> listenerMono = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RShardedTopicRx topic = redisson.getShardedTopic("myTopic");
Single<Long> publishMono = topic.publish(new SomeObject());
Partitioning¶
This feature is available only in Redisson PRO edition.
Although each ShardedTopic instance is cluster-compatible, it can be connected only to a single Redis or Valkey node which owns the topic name. That may cause the following issues:
- CPU overload on a single node.
- Overload of network or data traffic to a single node.
- Full interruption of the message flow during failover.
ShardedTopic partitioning addresses these challenges by enabling connections to all nodes in cluster and distributing messages effectively. It brings the following benefits:
- Increases throughput of the topic.
- Minimizes interruptions during failover.
- Lowers CPU and network load on Valkey or Redis nodes.
- Scales the message flow to multiple Valkey or Redis nodes.
Partitions amount is defined through the global topicSlots setting or per instance through ClusteredTopicOptions.slots() setting, which overrides the global setting.
Slots definition per instance:
RClusteredTopic topic = redisson.getClusteredTopic(ClusteredTopicOptions.name("myTopic").slots(15));
Usage example:
RClusteredTopic topic = redisson.getClusteredTopic("myTopic");
int listenerId = topic.addListener(MyObject.class, new MessageListener<MyObject>() {
@Override
public void onMessage(CharSequence channel, MyObject message) {
//...
}
});
// in other thread or JVM
RClusteredTopic topic = redisson.getClusteredTopic("myTopic");
long clientsReceivedMessage = topic.publish(new MyObject());
Reliable Topic¶
Java implementation of Redis or Valkey based RReliableTopic object implements Publish / Subscribe mechanism with reliable delivery of messages. In case of Redis or Valkey connection interruption all missed messages are delivered after reconnection to Redis. Message considered as delivered when it was received by Redisson and submited for processing by topic listeners.
Each RReliableTopic object instance (subscriber) has own watchdog which is started when the first listener was registered. Subscriber expires after org.redisson.config.Config#reliableTopicWatchdogTimeout timeout if watchdog didn't extend it to the next timeout time interval. This prevents against infinity grow of stored messages in topic due to Redisson client crash or any other reason when subscriber unable to consume messages.
Topic listeners are resubscribed automatically after reconnection to a server or failover.
Superseded by Reliable PubSub Topic.
Code example:
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RReliableTopic topic = redisson.getReliableTopic("anyTopic");
long subscribersReceivedMessage = topic.publish(new SomeObject());
RReliableTopicAsync topic = redisson.getReliableTopic("anyTopic");
RFuture<String> listenerFuture = topic.addListenerAsync(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RReliableTopicAsync topic = redisson.getReliableTopic("anyTopic");
RFuture<Long> future = topic.publishAsync(new SomeObject());
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableTopicReactive topic = redisson.getReliableTopic("anyTopic");
Mono<String> listenerMono = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RReliableTopicReactive topic = redisson.getReliableTopic("anyTopic");
Mono<Long> publishMono = topic.publish(new SomeObject());
RedissonRxClient redisson = redissonClient.rxJava();
RReliableTopicRx topic = redisson.getReliableTopic("anyTopic");
Single<String> listenerRx = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
@Override
public void onMessage(CharSequence channel, SomeObject message) {
//...
}
});
// in other thread or JVM
RReliableTopicRx topic = redisson.getReliableTopic("anyTopic");
Single<Long> publisRx = topic.publish(new SomeObject());
Partitioning¶
This feature is available only in Redisson PRO edition.
Although each ReliableTopic instance is cluster-compatible, it can be connected only to a single Redis or Valkey node which owns the topic name. That may cause the following issues:
- CPU overload on a single node.
- Overload of network or data traffic to a single node.
- Full interruption of the message flow during failover.
ReliableTopic partitioning addresses these challenges by enabling connections to all nodes in cluster and distributing messages effectively. It brings the following benefits:
- Increases throughput of the topic.
- Minimizes interruptions during failover.
- Lowers CPU and network load on Valkey or Redis nodes.
- Scales the message flow to multiple Valkey or Redis nodes.
Partitions amount is defined through the global topicSlots setting or per instance through ClusteredTopicOptions.slots() setting, which overrides the global setting.
Slots definition per instance:
RClusteredReliableTopic topic
= redisson.getClusteredReliableTopic(ClusteredTopicOptions.name("myTopic").slots(15));
Usage example:
RClusteredReliableTopic topic = redisson.getClusteredReliableTopic("myTopic");
int listenerId = topic.addListener(MyObject.class, new MessageListener<MyObject>() {
@Override
public void onMessage(CharSequence channel, MyObject message) {
//...
}
});
// in other thread or JVM
RClusteredReliableTopic topic = redisson.getClusteredReliableTopic("myTopic");
long clientsReceivedMessage = topic.publish(new MyObject());