Skip to content

Distributed queues

Reliable Queue

This feature is available only in Redisson PRO edition.

The Reliable Queue is a specialized distributed FIFO queue implementation built on top of Valkey or Redis that provides robust message processing and advanced queue management features. The implementation doesn't use a standard Valkey or Redis queue, but a complex data structure that includes stream. This object is fully thread-safe.

Unlike standard Valkey and Redis queues, the Reliable Queue ensures message delivery even in failure scenarios, provides acknowledgment mechanisms, and offers fine-grained control over message handling. Moreover, Valkey and Redis persistence with synchronized replication significantly increases queue reliability by maintaining multiple consistent copies of data across nodes, ensuring that messages remain available even during node failures or network disruptions.

Features

  • Queue Size Limit: Ability to set maximum capacity to prevent unbounded growth and resource exhaustion.

  • Message Visibility Timeout: Configurable time period during which a message is invisible to other consumers after being retrieved, preventing duplicate processing while allowing recovery if processing fails.

  • Message Delivery Limit: Maximum number of delivery attempts for a message before it's considered undeliverable and deleted or moved to Dead Letter Queue (DLQ) if configured.

  • Message Size Limit: Ability to restrict the size of messages to maintain performance.

  • Message Headers: Each message can have attached key-value metadata.

  • Message Expiration Timeout: Automatic removal of messages from the queue after a specified time period.

  • Message Priorities: Ability to prioritize certain messages for faster processing.

  • Message Delay: Schedule messages for future delivery with configurable delay periods.

  • Message Deduplication: Prevent duplicate message processing by object value used as ID or message payload hash within a configurable time interval.

  • Acknowledgments: Ability to acknowledge messages after successful processing, ensuring messages aren't lost during processing.

  • Negative Acknowledgments: Ability to negatively acknowledge messages that cannot be processed as failed or rejected, allowing for custom error handling strategies. Failed messages are redelivered and rejected are deleted or moved to Dead Letter Queue (DLQ) if configured.

  • Bulk Operations: Supports processing multiple messages in a single operation to improve throughput, including operations for adding, polling, moving, removing, acknowledging, and negatively acknowledging messages.

  • Short Polling and Long Polling: Support both immediate return and wait-for-message polling.

  • Operation Control: Ability to selectively disable or enable queue operations (add, poll) for maintenance, troubleshooting, or load management.

  • Event Listeners: Supports binding event listeners to key queue events such as message addition, polling, acknowledgment, negative acknowledgment, and removal.

  • Durability and Synchronous Replication: Data redundancy across Valkey or Redis nodes with forced synchronous replication per operation to prevent message loss during node failures.

  • Dead-Letter Queue (DLQ): If a message reached delivery limit or rejected it's transferred to a Dead Letter Queue (DLQ). It's a separate instance of Reliable Queue which stores messages for later analysis or reprocessing.

  • Atomic Operations: All queue operations are executed atomically. This ensures data consistency and prevents race conditions.

  • No Periodic Tasks: The queue operates without relying on periodic background tasks, reducing system overhead and improving reliability.

  • No Message Broker: Operates directly on Valkey or Redis without requiring a separate message-broker component and avoiding extra points of failure.

Configuration

Queue-Level Settings

Queue settings can be changed at runtime. All settings are optional. If not set, default values are used.

  • deliveryLimit - Defines the maximum number of delivery attempts for a message. Once this limit is reached, the message may be moved to a dead letter queue if it's configured, otherwise it will be deleted. Can be overridden when adding a message. Default value is 10.

  • visibility - Sets the duration for which a message becomes invisible to other consumers after being polled. This prevents multiple consumers from processing the same message simultaneously. Can be overridden when pooling a message. Default value is 30 seconds.

  • timeToLive - Sets the time-to-live duration for messages in the queue. Messages will be automatically removed from the queue after this duration expires. 0 value means expiration is not applied. Can be overridden when adding a message. Default value is 0.

  • deadLetterQueueName - Sets the name of the Dead Letter Queue (DLQ) to which messages that have reached the delivery limit or have been rejected are sent. Dead letter queue can be removed by setting null value. Default value is null.

  • maxMessageSize - Sets the maximum allowed size (in bytes) for a single message in the queue. Messages exceeding this size will be rejected. 0 value means size limit is not applied. Default value is 0.

  • delay - Sets the delay duration before a message becomes available for consumption after being added to the queue.0 value means delay duration is not applied. Can be overridden when adding a message. Default value is 0.

  • maxSize - Sets the maximum number of messages that can be stored in the queue. When the queue reaches this size, add messages operation may be blocked and/or return empty result. 0 value means queue size limit is not applied. Default value is 0.

  • processingMode - Sets the processing mode for the queue which determines how messages are processed by consumers:

    • SEQUENTIAL - messages are consumed strictly in the order they arrive in the queue. This mode enforces that only one message is actively processed at a time, preventing subsequent messages from becoming available until the current message is either acknowledged or its visibility timeout expires.
    • PARALLEL - multiple messages can be consumed concurrently. This mode allows to distribute the messages processing load across multiple consumers simultaneously.

    Default value is PARALLEL.

Code example of the reliable queue config definition:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue");

// or instance with a custom codec defined
RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue", new CustomCodec());


// overrides the previously set configuration
rq.setConfig(QueueConfig.defaults()
                   .deliveryLimit(4)
                   .visibilityTimeout(Duration.ofSeconds(60))
                   .timeToLive());

// applies the configuration only if no configuration has been set previously
rq.setConfigIfAbsent(QueueConfig.defaults()
                   .deliveryLimit(4)
                   .visibilityTimeout(Duration.ofSeconds(60))
                   .timeToLive());
RReliableQueueAsync<MyObject> rq = redisson.getReliableQueue("myqueue");

// or instance with a custom codec defined
RReliableQueueAsync<MyObject> rq = redisson.getReliableQueue("myqueue", new CustomCodec());


// overrides the previously set configuration
RFuture<Void> sfr = rq.setConfigAsync(QueueConfig.defaults()
                                       .deliveryLimit(4)
                                       .visibilityTimeout(Duration.ofSeconds(60))
                                       .timeToLive());

// applies the configuration only if no configuration has been set previously
RFuture<Boolean> rtsf = rq.setConfigIfAbsentAsync(QueueConfig.defaults()
                                               .deliveryLimit(4)
                                               .visibilityTimeout(Duration.ofSeconds(60))
                                               .timeToLive());
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<MyObject> rq = redisson.getReliableQueue("myqueue");

// or instance with a custom codec defined
RReliableQueueReactive<MyObject> rq = redisson.getReliableQueue("myqueue", new CustomCodec());


// overrides the previously set configuration
Mono<Void> sfr = rq.setConfig(QueueConfig.defaults()
                               .deliveryLimit(4)
                               .visibilityTimeout(Duration.ofSeconds(60))
                               .timeToLive());

// applies the configuration only if no configuration has been set previously
Mono<Boolean> rtsf = rq.setConfigIfAbsent(QueueConfig.defaults()
                                       .deliveryLimit(4)
                                       .visibilityTimeout(Duration.ofSeconds(60))
                                       .timeToLive());
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> rq = redisson.getReliableQueue("myqueue");

// or instance with a custom codec defined
RReliableQueueRx<MyObject> rq = redisson.getReliableQueue("myqueue", new CustomCodec());


// overrides the previously set configuration
Completable sfr = rq.setConfig(QueueConfig.defaults()
                                   .deliveryLimit(4)
                                   .visibilityTimeout(Duration.ofSeconds(60))
                                   .timeToLive());

// applies the configuration only if no configuration has been set previously
Single<Boolean> tsf = rq.setConfigIfAbsent(QueueConfig.defaults()
                                       .deliveryLimit(4)
                                       .visibilityTimeout(Duration.ofSeconds(60))
                                       .timeToLive());

Message-Level Settings

Message settings applied per add() or addMany() operation. All settings are optional except for payload. If optional settings are not set, defaults will be used.

  • payload - Defines the data to include in the message. The codec defined for the queue is used for data serialization and deserialization. required setting

  • deliveryLimit - Message Delivery Limit. Allows to specify the maximum number of delivery attempts for a message, after which it can be moved to Dead Letter Queue (DLQ) if configured. The delivery attempt count increases each time a message is redelivered after the visibility timeout is reached or when the message is negatively acknowledged with a failed status.
    Minimal value is 1. If not defined, the queue's deliveryLimit setting value is used. If queue's deliveryLimit setting is also not set, the default value is 10.

  • timeToLive - Message Expiration Timeout. Allows you to set a time-to-live (TTL) for messages in the queue, after which they are automatically removed if not processed.
    0 value means expiration is not applied. If not defined, the queue's timeToLive setting value is used. If queue's timeToLive setting is also not set, the default value is 0.

  • header \ headers - Message headers. Allow to attach metadata to messages without modifying the payload, enabling richer communication patterns and more sophisticated routing. Separates metadata from core message content.

  • delay - Message Delay. Allows to schedule messages for future delivery, with precise timing control down to the millisecond.
    0 value means delay duration is not applied. If not defined, the queue's delay setting value is used. If queue's delay setting is also not set, the default value is 0.

  • priority - Message Priorities. Allows to assign importance levels to messages, ensuring critical messages are processed before less important ones. Ensures critical operations take precedence.
    Priority defined as a number between 0 and 9. 0 - the lowest priority level. 9 - the highest priority level. Default value is 0.

  • deduplicationById - Message Deduplication by ID. Enables deduplication based on a custom value used as ID for the specified interval. During the specified interval, messages with the same ID will be considered duplicates and won't be added to the queue.

  • deduplicationByHash - Message Deduplication by Hash. Enables deduplication based on the message payload hash for the specified interval. During the specified interval, messages with the same hash will be considered duplicates and won't be added to the queue.

Adding Messages

The add() and addMany() methods each return a Message object for every added message, containing the payload, headers (identical to those of the original message) and a unique ID generated by the queue.

Below are the available settings per method call:

  • timeout - Sets the maximum time to wait when adding messages to a full queue with a limited size. If the queue is full, the add operation will block until either:

    • Space becomes available and the message(s) are successfully added.
    • The specified timeout duration elapses.

    0 means to wait indefinitely until space becomes available. If the timeout elapses before the message(s) can be added, the add() method returns null, while the addMany() method returns an empty collection.

  • headersCodec - Specifies the codec to be used for message headers serialization.

Non-blocking Enqueue (Unbounded Queue)

For queues without a size limit (maxSize = 0, the default), enqueue operations are always non-blocking. The message is added immediately, and the method returns as soon as the operation is complete.

Code examples of messages adding:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue");

MyObject data = new MyObject();

Message<MyObject> msg = rq.add(QueueAddArgs.messages(MessageArgs.payload(data)
                                                       .deliveryLimit(10)
                                                       .timeToLive(Duration.ofDays(24))
                                                       .header("key1", new MyDataValue())
                                                       .header("key2", new MyDataValue())
                                                       .delay(Duration.ofSeconds(45))
                                                       .priority(3)
                                                       .deduplicationById("myid", Duration.ofHours(23))));

String id = msg.getId();
MyObject data = msg.getPayload();
Map<String, MyDataValue> headers = msg.getHeaders();

// adding messages in a batch
List<Message<MyObject>> msgs =  rq.addMany(QueueAddArgs.messages(MessageArgs.payload(data1), 
                                                                 MessageArgs.payload(data2), 
                                                                 MessageArgs.payload(data3)));
RReliableQueueAsync<MyObject> rq = redisson.getReliableQueue("myqueue");

MyObject data = new MyObject();

RFuture<Message<MyObject>> msg = rq.addAsync(QueueAddArgs.messages(MessageArgs.payload(data)
                                                       .deliveryLimit(10)
                                                       .timeToLive(Duration.ofDays(24))
                                                       .header("key1", new MyDataValue())
                                                       .header("key2", new MyDataValue())
                                                       .delay(Duration.ofSeconds(45))
                                                       .priority(3)
                                                       .deduplicationById("myid", Duration.ofHours(23))));

String id = msg.getId();
MyObject data = msg.getPayload();
Map<String, MyDataValue> headers = msg.getHeaders();

// adding messages in a batch
RFuture<List<Message<MyObject>>> msgs =  rq.addManyAsync(QueueAddArgs.messages(MessageArgs.payload(data1), 
                                                                 MessageArgs.payload(data2), 
                                                                 MessageArgs.payload(data3)));
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<MyObject> rq = redisson.getReliableQueue("myqueue");

MyObject data = new MyObject();

Mono<Message<MyObject>> msg = rq.add(QueueAddArgs.messages(MessageArgs.payload(data)
                                                       .deliveryLimit(10)
                                                       .timeToLive(Duration.ofDays(24))
                                                       .header("key1", new MyDataValue())
                                                       .header("key2", new MyDataValue())
                                                       .delay(Duration.ofSeconds(45))
                                                       .priority(3)
                                                       .deduplicationById("myid", Duration.ofHours(23))));

String id = msg.getId();
MyObject data = msg.getPayload();
Map<String, MyDataValue> headers = msg.getHeaders();

// adding messages in a batch
Mono<List<Message<MyObject>>> msgs =  rq.addMany(QueueAddArgs.messages(MessageArgs.payload(data1), 
                                                                 MessageArgs.payload(data2), 
                                                                 MessageArgs.payload(data3)));
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> rq = redisson.getReliableQueue("myqueue");

MyObject data = new MyObject();

Maybe<Message<MyObject>> msg = rq.add(QueueAddArgs.messages(MessageArgs.payload(data)
                                                       .deliveryLimit(10)
                                                       .timeToLive(Duration.ofDays(24))
                                                       .header("key1", new MyDataValue())
                                                       .header("key2", new MyDataValue())
                                                       .delay(Duration.ofSeconds(45))
                                                       .priority(3)
                                                       .deduplicationById("myid", Duration.ofHours(23))));

String id = msg.getId();
MyObject data = msg.getPayload();
Map<String, MyDataValue> headers = msg.getHeaders();

// adding messages in a batch
Single<List<Message<MyObject>>> msgs =  rq.addMany(QueueAddArgs.messages(MessageArgs.payload(data1), 
                                                                 MessageArgs.payload(data2), 
                                                                 MessageArgs.payload(data3)));

Blocking Enqueue (Bounded Queue)

When the queue has a maximum size (maxSize > 0), enqueue operations will be blocked if the queue is full and timeout setting is defined. timeout settings is used to control how long the operation waits for space to become available.

Code examples of messages adding to a queue with limited size:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue");

MyObject data = new MyObject();

// adding message with timeout in 120 seconds
Message<MyObject> msg = rq.add(QueueAddArgs.messages(MessageArgs.payload(data)
                                                       .delay(Duration.ofSeconds(45))
                                                       .deduplicationById("myid", Duration.ofHours(23)))
                                                       .timeout(Duration.ofSeconds(120)));

// adding messages in a batch with timeout in 90 seconds
List<Message<MyObject>> msgs =  rq.addMany(QueueAddArgs.messages(MessageArgs.payload(data1), 
                                                                 MessageArgs.payload(data2), 
                                                                 MessageArgs.payload(data3))
                                                       .timeout(Duration.ofSeconds(90)));
RReliableQueueAsync<MyObject> rq = redisson.getReliableQueue("myqueue");

MyObject data = new MyObject();

// adding message with timeout in 120 seconds
RFuture<Message<MyObject>> msg = rq.addAsync(QueueAddArgs.messages(MessageArgs.payload(data)
                                                       .delay(Duration.ofSeconds(45))
                                                       .deduplicationById("myid", Duration.ofHours(23)))
                                                       .timeout(Duration.ofSeconds(120)));

// adding messages in a batch with timeout in 90 seconds
RFuture<List<Message<MyObject>>> msgs =  rq.addManyAsync(QueueAddArgs.messages(MessageArgs.payload(data1), 
                                                                 MessageArgs.payload(data2), 
                                                                 MessageArgs.payload(data3))
                                                       .timeout(Duration.ofSeconds(90)));
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<MyObject> rq = redisson.getReliableQueue("myqueue");

MyObject data = new MyObject();

// adding message with timeout in 120 seconds
Mono<Message<MyObject>> msg = rq.add(QueueAddArgs.messages(MessageArgs.payload(data)
                                                       .delay(Duration.ofSeconds(45))
                                                       .deduplicationById("myid", Duration.ofHours(23)))
                                                       .timeout(Duration.ofSeconds(120)));

// adding messages in a batch with timeout in 90 seconds
Mono<List<Message<MyObject>>> msgs =  rq.addMany(QueueAddArgs.messages(MessageArgs.payload(data1), 
                                                                 MessageArgs.payload(data2), 
                                                                 MessageArgs.payload(data3))
                                                       .timeout(Duration.ofSeconds(90)));
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> rq = redisson.getReliableQueue("myqueue");

MyObject data = new MyObject();

// adding message with timeout in 120 seconds
Maybe<Message<MyObject>> msg = rq.add(QueueAddArgs.messages(MessageArgs.payload(data)
                                                       .delay(Duration.ofSeconds(45))
                                                       .deduplicationById("myid", Duration.ofHours(23)))
                                                       .timeout(Duration.ofSeconds(120)));

// adding messages in a batch with timeout in 90 seconds
Single<List<Message<MyObject>>> msgs =  rq.addMany(QueueAddArgs.messages(MessageArgs.payload(data1), 
                                                                 MessageArgs.payload(data2), 
                                                                 MessageArgs.payload(data3))
                                                       .timeout(Duration.ofSeconds(90)));

Polling Messages

The poll() and pollMany() methods each return a Message object or list of Message objects. These methods are resubscribed automatically during re-connection to a Valkey or Redis node or failover.

Below are the available settings per method call:

  • acknowledgeMode - Sets the acknowledgment mode which determines how messages are acknowledged after retrieval:

    • AUTO - messages are automatically acknowledged after delivery.
    • MANUAL - messages must be explicitly acknowledged by the consumer.

    Default value is MANUAL.

  • visibility - Temporarily hides a message from other consumers after it's been received by one consumer. This prevents multiple consumers from processing the same message simultaneously. Automatically requeues messages if processing fails or crashes.
    If not defined, the queue's visibility setting value is used. If queue's visibility setting is also not set, the default value is 30 seconds.

  • timeout - Sets the maximum time to wait for messages to become available. If the queue is empty, the poll operation will block until either:

    • At least one message becomes available.
    • The specified timeout duration elapses.

    0 means to wait indefinitely for a message. If the timeout elapses without any messages becoming available, the poll() method returns null, while the pollMany() method returns an empty collection.

  • count - Sets the maximum number of messages to retrieve in a single poll operation. This parameter enables batch retrieval of messages, which can improve throughput when processing multiple messages at once. The actual number of messages returned may be less than the requested count if fewer messages are available.

  • headersCodec - Specifies the codec to be used for message headers deserialization.

Short polling

Short polling returns immediate response with or without messages.

Code examples of short polling:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue");


// polling a single message
Message<MyObject> msg = rq.poll(QueuePollArgs.defaults()
                                            .visibility(Duration.ofSeconds(10))
                                            .headersCodec(new CustomCodec())
                                            .acknowledgeMode(AcknowledgeMode.MANUAL));


// polling messages in a batch
List<Message<MyObject>> msgs = rq.pollMany(QueuePollArgs.defaults()
                                            .acknowledgeMode(AcknowledgeMode.AUTO)
                                            .count(10));
RReliableQueueAsync<MyObject> rq = redisson.getReliableQueue("myqueue");


// polling a single message
RFuture<Message<MyObject>> msg = rq.pollAsync(QueuePollArgs.defaults()
                                            .visibility(Duration.ofSeconds(10))
                                            .headersCodec(new CustomCodec())
                                            .acknowledgeMode(AcknowledgeMode.MANUAL));


// polling messages in a batch
RFuture<List<Message<MyObject>>> msgs = rq.pollManyAsync(QueuePollArgs.defaults()
                                            .acknowledgeMode(AcknowledgeMode.AUTO)
                                            .count(10));
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<MyObject> rq = redisson.getReliableQueue("myqueue");


// polling a single message
Mono<Message<MyObject>> msg = rq.poll(QueuePollArgs.defaults()
                                            .visibility(Duration.ofSeconds(10))
                                            .headersCodec(new CustomCodec())
                                            .acknowledgeMode(AcknowledgeMode.MANUAL));


// polling messages in a batch
Mono<List<Message<MyObject>>> msgs = rq.pollMany(QueuePollArgs.defaults()
                                            .acknowledgeMode(AcknowledgeMode.AUTO)
                                            .count(10));
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> rq = redisson.getReliableQueue("myqueue");


// polling a single message
Maybe<Message<MyObject>> msg = rq.poll(QueuePollArgs.defaults()
                                            .visibility(Duration.ofSeconds(10))
                                            .headersCodec(new CustomCodec())
                                            .acknowledgeMode(AcknowledgeMode.MANUAL));


// polling messages in a batch
Single<List<Message<MyObject>>> msgs = rq.pollMany(QueuePollArgs.defaults()
                                            .acknowledgeMode(AcknowledgeMode.AUTO)
                                            .count(10));

Long polling

Long polling waits for messages to arrive with specified timeout setting.

Code examples of long polling:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue");


// polling a single message
Message<MyObject> msg = rq.poll(QueuePollArgs.defaults()
                                            .visibility(Duration.ofSeconds(10))
                                            .headersCodec(new CustomCodec())
                                            .acknowledgeMode(AcknowledgeMode.AUTO)
                                            .timeout(Duration.ofSeconds(140)));


// polling messages in a batch
List<Message<MyObject>> msgs = rq.pollMany(QueuePollArgs.defaults()
                                            .acknowledgeMode(AcknowledgeMode.MANUAL)
                                            .timeout(Duration.ofSeconds(120)
                                            .count(10));
RReliableQueueAsync<MyObject> rq = redisson.getReliableQueue("myqueue");


// polling a single message
RFuture<Message<MyObject>> msg = rq.pollAsync(QueuePollArgs.defaults()
                                                    .visibility(Duration.ofSeconds(10))
                                                    .headersCodec(new CustomCodec())
                                                    .acknowledgeMode(AcknowledgeMode.AUTO)
                                                    .timeout(Duration.ofSeconds(140)));


// polling messages in a batch
RFuture<List<Message<MyObject>>> msgs = rq.pollManyAsync(QueuePollArgs.defaults()
                                                            .acknowledgeMode(AcknowledgeMode.MANUAL)
                                                            .timeout(Duration.ofSeconds(120)
                                                            .count(10));
RedissonReactiveClient redisson = redissonClient.reactive();    
RReliableQueueReactive<MyObject> rq = redisson.getReliableQueue("myqueue");


// polling a single message
Mono<Message<MyObject>> msg = rq.poll(QueuePollArgs.defaults()
                                            .visibility(Duration.ofSeconds(10))
                                            .headersCodec(new CustomCodec())
                                            .acknowledgeMode(AcknowledgeMode.AUTO)
                                            .timeout(Duration.ofSeconds(140)));


// polling messages in a batch
Mono<List<Message<MyObject>>> msgs = rq.pollMany(QueuePollArgs.defaults()
                                                    .acknowledgeMode(AcknowledgeMode.MANUAL)
                                                    .timeout(Duration.ofSeconds(120)
                                                    .count(10));
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> rq = redisson.getReliableQueue("myqueue");


// polling a single message
Maybe<Message<MyObject>> msg = rq.poll(QueuePollArgs.defaults()
                                            .visibility(Duration.ofSeconds(10))
                                            .headersCodec(new CustomCodec())
                                            .acknowledgeMode(AcknowledgeMode.AUTO)
                                            .timeout(Duration.ofSeconds(140)));


// polling messages in a batch
Single<List<Message<MyObject>>> msgs = rq.pollMany(QueuePollArgs.defaults()
                                                            .acknowledgeMode(AcknowledgeMode.MANUAL)
                                                            .timeout(Duration.ofSeconds(120)
                                                            .count(10));

Acknowledging Messages

Message acknowledgment is required for messages polled with the acknowledgeMode setting set to AcknowledgeMode.MANUAL.

Acknowledgments

Acknowledgments require consumers to explicitly confirm successful processing of messages, ensuring reliable delivery and processing. Supports exactly-once processing semantics. When a message is acknowledged, it is deleted from the queue.

Code examples of message acknowledgment:

RReliableQueue<Integer> rq = redisson.getReliableQueue("myqueue");

// message id
String mid = ...
rq.acknowledge(QueueAckArgs.ids(mid));

// message ids
String[] mids = ...
// acknowledge message ids in a batch
rq.acknowledge(QueueAckArgs.ids(mids));
RReliableQueueAsync<Integer> rq = redisson.getReliableQueue("myqueue");

// message id
String mid = ...
RFuture<Void> ack = rq.acknowledgeAsync(QueueAckArgs.ids(mid));

// message ids
String[] mids = ...
// acknowledge message ids in a batch
RFuture<Void> ack = rq.acknowledgeAsync(QueueAckArgs.ids(mids));
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<Integer> rq = redisson.getReliableQueue("myqueue");

// message id
String mid = ...
Mono<Void> mn = rq.acknowledge(QueueAckArgs.ids(mid));

// message ids
String[] mids = ...
// acknowledge message ids in a batch
Mono<Void> mn = rq.acknowledge(QueueAckArgs.ids(mids));
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> rq = redisson.getReliableQueue("myqueue");

// message id
String mid = ...
Completable ack = rq.acknowledge(QueueAckArgs.ids(mid));

// message ids
String[] mids = ...
// acknowledge message ids in a batch
Completable ack = rq.acknowledge(QueueAckArgs.ids(mids));

Negative Acknowledgments

Negative acknowledgments allow consumers to explicitly mark messages with one of the following statuses:

  • Failed - indicates that the client application failed to process the message. The message is redelivered. Allows to define the delay duration before the failed message is eligible for redelivery.

  • Rejected - indicates that the client application could process the message, but it was not accepted. The message is removed and moved to the Dead Letter Queue (DLQ) if configured.

Code examples of negative message acknowledgment:

RReliableQueue<Integer> rq = redisson.getReliableQueue("myqueue");

// message id
String mid = ...
// mark message as failed with delay in 17 seconds for redelivery
rq.negativeAcknowledge(QueueNegativeAckArgs
                                    .failed(mid)
                                    .delay(Duration.ofSeconds(17)));

// mark message as rejected
rq.negativeAcknowledge(QueueNegativeAckArgs
                                    .rejected(mid));


// message ids
String[] mids = ...

// mark messages as failed in a batch with delay in 9 seconds for redelivery
rq.negativeAcknowledge(QueueNegativeAckArgs
                                    .failed(mids)
                                    .delay(Duration.ofSeconds(9)));

// mark messages as rejected in a batch
rq.negativeAcknowledge(QueueNegativeAckArgs
                                    .rejected(mids));
RReliableQueueAsync<Integer> rq = redisson.getReliableQueue("myqueue");

// message id
String mid = ...
// mark message as failed with delay in 17 seconds for redelivery
RFuture<Void> rf = rq.negativeAcknowledgeAsync(QueueNegativeAckArgs
                                                    .failed(mid)
                                            .delay(Duration.ofSeconds(17)));

// mark message as rejected
RFuture<Void> rf = rq.negativeAcknowledgeAsync(QueueNegativeAckArgs
                                                   .rejected(mid));


// message ids
String[] mids = ...

// mark messages as failed in a batch with delay in 9 seconds for redelivery
RFuture<Void> rf = rq.negativeAcknowledgeAsync(QueueNegativeAckArgs
                                                    .failed(mids)
                                            .delay(Duration.ofSeconds(9)));

// mark messages as rejected in a batch
RFuture<Void> rf = rq.negativeAcknowledgeAsync(QueueNegativeAckArgs
                                                    .rejected(mids));
RReliableQueueReactive<Integer> rq = redisson.getReliableQueue("myqueue");

// message id
String mid = ...
// mark message as failed with delay in 17 seconds for redelivery
Mono<Void> rf = rq.negativeAcknowledge(QueueNegativeAckArgs
                                            .failed(mid)
                                            .delay(Duration.ofSeconds(17)));

// mark message as rejected
Mono<Void> rf = rq.negativeAcknowledge(QueueNegativeAckArgs
                                            .rejected(mid));


// message ids
String[] mids = ...

// mark messages as failed in a batch with delay in 9 seconds for redelivery
Mono<Void> rf = rq.negativeAcknowledge(QueueNegativeAckArgs
                                            .failed(mids)
                                            .delay(Duration.ofSeconds(9)));

// mark messages as rejected in a batch
Mono<Void> rf = rq.negativeAcknowledge(QueueNegativeAckArgs
                                            .rejected(mids));
RReliableQueueRx<Integer> rq = redisson.getReliableQueue("myqueue");

// message id
String mid = ...
// mark message as failed with delay in 17 seconds for redelivery
Completable rf = rq.negativeAcknowledge(QueueNegativeAckArgs
                                            .failed(mid)
                                            .delay(Duration.ofSeconds(17)));

// mark message as rejected
Completable rf = rq.negativeAcknowledge(QueueNegativeAckArgs
                                            .rejected(mid));


// message ids
String[] mids = ...

// mark messages as failed in a batch with delay in 9 seconds for redelivery
Completable rf = rq.negativeAcknowledge(QueueNegativeAckArgs
                                            .failed(mids)
                                            .delay(Duration.ofSeconds(9)));

// mark messages as rejected in a batch
Completable rf = rq.negativeAcknowledge(QueueNegativeAckArgs
                                                .rejected(mids));

Inspecting and Managing

Messages can be inspected, removed or moved to another queue if necessary.

  • listAll() method returns all messages in the queue, ready to be retrieved by the poll() command. Note that this method fetches all messages at once and can be time consuming for queues with a large number of messages.

  • move() method moves messages from the queue to the end of the destination queue.

  • remove() method removes messages from the queue.

  • size() method returns the total number of messages in the queue ready for polling, excluding delayed and unacknowledged messages.

  • countDelayedMessages() method returns the number of delayed messages in the queue.

  • countUnacknowledgedMessages() method returns the number of unacknowledged messages in the queue.

  • get() method returns message by id

  • getAll() method returns messages by ids

Queue operations can be disabled or enabled if necessary.

Disabling add() operation prevents new messages from being added while allowing processing of existing messages.
Disabling poll() operation allows adding new messages while preventing consumption, essentially creating a backlog.

Selectively enabling/disabling send or receive operations can help isolate and troubleshoot application issues with messages handling. Queue operations can be temporarily disabled during system maintenance. Disabling certain queue operations during periods of high system load can prevent system overload and ensure stability. All messages are maintained in Valkey or Redis, ensuring that no data is lost.

  • disableOperation() method disables a queue operation

  • enableOperation() method enables a queue operation

Code examples of queue inspection:

RReliableQueue<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// load all messages in the queue.
List<Message<MyObject>> msgs = dlq.listAll();

// get message by id
Message<MyObject>> msg = dlq.get(id);

// get messages by ids
List<Message<MyObject>>> msgs = dlq.getAll(ids);
RReliableQueueAsync<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// load all messages in the queue.
RFuture<List<Message<MyObject>>> msgs = dlq.listAllAsync();

// get message by id
RFuture<Message<MyObject>>> msg = dlq.getAsync(id);

// get messages by ids
RFuture<List<Message<MyObject>>>> msgs = dlq.getAllAsync(ids);  
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// load all messages in the queue.
Mono<List<Message<MyObject>>> msgs = dlq.listAll();

// get message by id
Mono<Message<MyObject>>> msg = dlq.get(id);

// get messages by ids
Mono<List<Message<MyObject>>>> msgs = dlq.getAll(ids);  
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// load all messages in the queue.
Single<List<Message<MyObject>>> msgs = dlq.listAll();

// get message by id
Maybe<Message<MyObject>>> msg = dlq.get(id);

// get messages by ids
Single<List<Message<MyObject>>>> msgs = dlq.getAll(ids);        

Code examples of messages transition:

RReliableQueue<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// move messages by id to the end of the destination queue
// and get the number of messages moved
int movedCount = dlq.move(QueueMoveArgs.move()
                                        .ids(id1, id2, id3)
                                        .destination("myqueue"));

// move the first 10 messages from beginning of the queue
// to the end of the destination queue
// and get the number of messages moved
int movedCount = dlq.move(QueueMoveArgs.move()
                                        .count(10)
                                        .destination("myqueue"));
RReliableQueueAsync<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// move messages by id to the end of the destination queue
// and get the number of messages moved
RFuture<Integer> movedCount = dlq.moveAsync(QueueMoveArgs.move()
                                                .ids(id1, id2, id3)
                                                .destination("myqueue"));

// move the first 10 messages from beginning of the queue
// to the end of the destination queue
// and get the number of messages moved
RFuture<Integer> movedCount = dlq.moveAsync(QueueMoveArgs.move()
                                                .count(10)
                                                .destination("myqueue"));
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// move messages by id to the end of the destination queue
// and get the number of messages moved
Mono<Integer> movedCount = dlq.move(QueueMoveArgs.move()
                                        .ids(id1, id2, id3)
                                        .destination("myqueue"));

// move the first 10 messages from beginning of the queue
// to the end of the destination queue
// and get the number of messages moved
Mono<Integer> movedCount = dlq.move(QueueMoveArgs.move()
                                        .count(10)
                                        .destination("myqueue"));
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// move messages by id to the end of the destination queue
// and get the number of messages moved
Single<Integer> movedCount = dlq.move(QueueMoveArgs.move()
                                        .ids(id1, id2, id3)
                                        .destination("myqueue"));

// move the first 10 messages from beginning of the queue
// to the end of the destination queue
// and get the number of messages moved
Single<Integer> movedCount = dlq.move(QueueMoveArgs.move()
                                        .count(10)
                                        .destination("myqueue"));

Code examples of messages removal:

RReliableQueue<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// remove a single message
// and get true if the message successfully removed
boolean removed = dlq.remove(QueueRemoveArgs.ids(id1));

// remove messages in a batch
// and get the number of messages successfully removed
int count = dlq.removeMany(QueueRemoveArgs.ids(id1));
RReliableQueueAsync<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// remove a single message
// and get true if the message successfully removed
RFuture<Boolean> removed = dlq.removeAsync(QueueRemoveArgs.ids(id1));

// remove messages in a batch
// and get the number of messages successfully removed
RFuture<Integer> count = dlq.removeMany(QueueRemoveArgs.ids(id1));
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// remove a single message
// and get true if the message successfully removed
Mono<Boolean> removed = dlq.remove(QueueRemoveArgs.ids(id1));

// remove messages in a batch
// and get the number of messages successfully removed
Mono<Integer> count = dlq.removeMany(QueueRemoveArgs.ids(id1));
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// remove a single message
// and get true if the message successfully removed
Single<Boolean> removed = dlq.remove(QueueRemoveArgs.ids(id1));

// remove messages in a batch
// and get the number of messages successfully removed
Single<Integer> count = dlq.removeMany(QueueRemoveArgs.ids(id1));

Listeners

Redisson provides reliable event listener binding for RReliableQueue objects. Listeners receive messages sent during connection interruptions and failovers. When network issues occur or during failover, the listeners will not miss any notifications and receive them after reconnection.

Listener class name Event description
org.redisson.api.queue.event.AddedEventListener Message added into queue
org.redisson.api.queue.event.PolledEventListener Message polled from queue
org.redisson.api.queue.event.RemovedEventListener Message removed from queue
org.redisson.api.queue.event.AcknowledgedEventListener Message acknowledged
org.redisson.api.queue.event.NegativelyAcknowledgedEventListener Message negatively acknowledged
org.redisson.api.queue.event.ConfigEventListener Queue config is set
org.redisson.api.queue.event.DisabledOperationEventListener Queue operation switched to disabled state
org.redisson.api.queue.event.EnabledOperationEventListener Queue operation switched to enabled state
org.redisson.api.queue.event.FullEventListener Queue is full

Code examples of listeners usage:

RReliableQueue<MyObject> queue = redisson.getReliableQueue("myqueue");

String id = queue.addListener(new AddedEventListener() {
     @Override
     public void onAdded(List<String> ids) {
        // ...
     }
});

// ...

queue.removeListener(id);
RReliableQueueAsync<MyObject> queue = redisson.getReliableQueue("myqueue");

RFuture<String> f = queue.addListenerAsync(new AddedEventListener() {
     @Override
     public void onAdded(List<String> ids) {
        // ...
     }
});

// ...

RFuture<Void> vf = queue.removeListenerAsync(id);
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<MyObject> queue = redisson.getReliableQueue("myqueue");

Mono<String> f = queue.addListener(new AddedEventListener() {
     @Override
     public void onAdded(List<String> ids) {
        // ...
     }
});

// ...

Mono<Void> vf = queue.removeListener(id);
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueueRx<MyObject> queue = redisson.getReliableQueue("myqueue");

Single<String> f = queue.addListener(new AddedEventListener() {
     @Override
     public void onAdded(List<String> ids) {
        // ...
     }
});

// ...

Completable vf = queue.removeListener(id);

Dead-Letter Queue (DLQ)

A Dead-Letter Queue (DLQ) is a specialized message queue used to store messages that cannot be processed or delivered due to errors or other issues. Instead of losing or endlessly retrying problematic messages, they are moved to the DLQ for later inspection and handling. This isolation mechanism significantly improves system reliability and prevents potential downtime, allowing applications to maintain continuous operations despite message processing failures.

Messages may be routed to a DLQ for a number of reasons:

  • Message delivery attempts exceeds maximum delivery limit.
  • Message was negatively acknowledged with reject status.

DLQ object has the same capabilities as Reliable Queue object. To start using it, define it in the queue configuration using QueueConfig object. This action registers the queue as a source queue for the DLQ.

Code examples of DLQ definition:

// source queue
RReliableQueue<Integer> rq = redisson.getReliableQueue("myqueue");

// defining dead-letter queue
rq.setConfig(QueueConfig.defaults()
                        .deadLetterQueueName("myqueue-dlq"));
// source queue
RReliableQueueAsync<Integer> rq = redisson.getReliableQueue("myqueue");

// defining dead-letter queue
RFuture<Void> rf = rq.setConfigAsync(QueueConfig.defaults()
                        .deadLetterQueueName("myqueue-dlq"));
// source queue
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<Integer> rq = redisson.getReliableQueue("myqueue");

// defining dead-letter queue
Mono<Void> rf = rq.setConfig(QueueConfig.defaults()
                        .deadLetterQueueName("myqueue-dlq"));
// source queue
RReliableQueueRx<Integer> rq = redisson.getReliableQueue("myqueue");

// defining dead-letter queue
Completable rf = rq.setConfig(QueueConfig.defaults()
                        .deadLetterQueueName("myqueue-dlq"));

DLQ source queue names can be listed using getDeadLetterQueueSources() method.

Code examples of DLQ source queue names retrieval:

// dead-letter queue
RReliableQueue<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// returns 'myqueue'
Set<String> sourceQueueNames = dlq.getDeadLetterQueueSources();
// dead-letter queue
RReliableQueueAsync<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// returns 'myqueue'
RFuture<Set<String>> sourceQueueNames = dlq.getDeadLetterQueueSourcesAsync();
RedissonReactiveClient redisson = redissonClient.reactive();
// dead-letter queue
RReliableQueueReactive<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// returns 'myqueue'
Mono<Set<String>> sourceQueueNames = dlq.getDeadLetterQueueSources();
RedissonRxClient redisson = redissonClient.rxJava();
// dead-letter queue
RReliableQueueRx<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// returns 'myqueue'
Single<Set<String>> sourceQueueNames = dlq.getDeadLetterQueueSources();

Durability and Synchronous Replication

The synchronization mechanism allows queue modification operations to be propagated to replica nodes in a controlled manner, ensuring data consistency across Valkey or Redis cluster. Additionally, Valkey and Redis persistence options, such as append-only files (AOF), and RDB snapshots, significantly increase queue reliability by preventing data loss during server restarts or failures, allowing for recovery of queued messages that would otherwise be lost when using only in-memory storage.

Valkey and Redis use asynchronous replication. Reliable Queue introduces synchronous replication modes per operation to address the limitations of asynchronous replication. This capability with proper configuration of the persistence transforms Valkey and Redis from a purely volatile memory store into a more robust message broker suitable for applications where message delivery guarantees are critical. This is particularly important for mission-critical applications where data loss is unacceptable.

Each queue modification operation can be configured with specific synchronization parameters:

  • syncMode - Sets the synchronization mode to be used for current operation. Default value is AUTO.

    Three synchronization strategies are available:

    • AUTO - Ensures data durability by blocking until write operations are confirmed as persisted to the memory and the Append-Only File (AOF) on the primary Valkey or Redis node and replicas if the AOF persistence feature is enabled. If AOF persistence is unavailable, falls back to blocking until replica nodes acknowledge that write operations have been applied to memory. If neither durability mechanism is available, proceeds without synchronization guarantees.
    • ACK - Ensures data durability by blocking until replica nodes acknowledge that write operations have been applied to memory.
    • ACK_AOF - Ensures data durability by blocking until write operations are confirmed as persisted to the memory and the Append-Only File (AOF) on the primary Valkey or Redis node and replicas. Requires Redis 7.2.0+ or any Valkey version.
  • syncFailureMode - Sets the behavior when synchronization with replica nodes fails. Default value is LOG_WARNING.

    Two failure handling modes are available:

    • THROW_EXCEPTION - Throw an exception to the caller. This mode is useful in scenarios where synchronization failures should be immediately visible and handled by the application code.
    • LOG_WARNING - Log a warning message but continue execution. This mode is suitable for non-critical synchronization operations where the application can continue functioning despite synchronization issues.
  • syncTimeout - Sets the timeout duration for synchronization of the current operation. Defines how long the caller will wait for acknowledgment from replica nodes. Default value is 1 second.

Code examples of synchronization parameters usage:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("test");

rq.add(QueueAddArgs.messages(MessageArgs.payload(data))
                        .syncMode(SyncMode.ACK_AOF)
                        .syncTimeout(Duration.ofSeconds(15))
                        .syncFailureMode(SyncFailureMode.LOG_WARNING));

rq.poll(QueuePollArgs.defaults()
                        .syncMode(SyncMode.ACK)
                        .syncTimeout(Duration.ofSeconds(10))
                        .syncFailureMode(SyncFailureMode.THROW_EXCEPTION));

rq.acknowledge(QueueAckArgs.ids(msg.getId())
                            .syncMode(SyncMode.AUTO)
                            .syncTimeout(Duration.ofSeconds(20)));

rq.negativeAcknowledge(QueueNegativeAckArgs.rejected(msg.getId())
                            .syncMode(SyncMode.ACK)
                            .syncTimeout(Duration.ofSeconds(15)));
RReliableQueue<MyObject> rq = redisson.getReliableQueue("test");

RFuture<Message<MyObject>> arf = rq.addAsync(QueueAddArgs.messages(MessageArgs.payload(data))
                                                    .syncMode(SyncMode.ACK_AOF)
                                                    .syncTimeout(Duration.ofSeconds(15))
                                                    .syncFailureMode(SyncFailureMode.LOG_WARNING));

RFuture<Message<MyObject>> prf = rq.pollAsync(QueuePollArgs.defaults()
                                                    .syncMode(SyncMode.ACK)
                                                    .syncTimeout(Duration.ofSeconds(10))
                                                    .syncFailureMode(SyncFailureMode.THROW_EXCEPTION));

RFuture<Void> ack = rq.acknowledgeAsync(QueueAckArgs.ids(msg.getId())
                                            .syncMode(SyncMode.AUTO)
                                            .syncTimeout(Duration.ofSeconds(20)));

RFuture<Void> nack = rq.negativeAcknowledgeAsync(QueueNegativeAckArgs.rejected(msg.getId())
                                                    .syncMode(SyncMode.ACK)
                                                    .syncTimeout(Duration.ofSeconds(15)));
RedissonReactiveClient redisson = redissonClient.reactive();
RReliableQueueReactive<MyObject> rq = redisson.getReliableQueue("test");

Mono<Message<MyObject>> arf = rq.add(QueueAddArgs.messages(MessageArgs.payload(data))
                                            .syncMode(SyncMode.ACK_AOF)
                                            .syncTimeout(Duration.ofSeconds(15))
                                            .syncFailureMode(SyncFailureMode.LOG_WARNING));

Mono<Message<MyObject>> prf = rq.poll(QueuePollArgs.defaults()
                                        .syncMode(SyncMode.ACK)
                                        .syncTimeout(Duration.ofSeconds(10))
                                        .syncFailureMode(SyncFailureMode.THROW_EXCEPTION));

Mono<Void> ack = rq.acknowledge(QueueAckArgs.ids(msg.getId())
                                    .syncMode(SyncMode.AUTO)
                                    .syncTimeout(Duration.ofSeconds(20)));

Mono<Void> nack =     rq.negativeAcknowledge(QueueNegativeAckArgs.rejected(msg.getId())
                                                    .syncMode(SyncMode.ACK)
                                                    .syncTimeout(Duration.ofSeconds(15)));
RedissonRxClient redisson = redissonClient.rxJava();
RReliableQueue<MyObject> rq = redisson.getReliableQueue("test");

Maybe<Message<MyObject>> arf =     rq.add(QueueAddArgs.messages(MessageArgs.payload(data))
                                            .syncMode(SyncMode.ACK_AOF)
                                            .syncTimeout(Duration.ofSeconds(15))
                                            .syncFailureMode(SyncFailureMode.LOG_WARNING));

Maybe<Message<MyObject>> prf = rq.poll(QueuePollArgs.defaults()
                                        .syncMode(SyncMode.ACK)
                                        .syncTimeout(Duration.ofSeconds(10))
                                        .syncFailureMode(SyncFailureMode.THROW_EXCEPTION));

Completable ack = rq.acknowledge(QueueAckArgs.ids(msg.getId())
                                    .syncMode(SyncMode.AUTO)
                                    .syncTimeout(Duration.ofSeconds(20)));

Completable nack =     rq.negativeAcknowledge(QueueNegativeAckArgs.rejected(msg.getId())
                                                .syncMode(SyncMode.ACK)
                                                .syncTimeout(Duration.ofSeconds(15)));

Metrics

Reliable Queue has metrics for monitoring a queue operation statistics.

Base name: redisson.reliable-queue.<name>

  • polls - A Counter that counts the total number of executed poll operations.
  • polled-messages - A Counter that counts the total number of messages retrieved from the queue through poll operations.

  • additions - A Counter that counts the total number of executed add operations.

  • added-messages - A Counter that counts the total number of messages added to the queue.

  • removals - A Counter that counts the total number of executed removal operations.

  • removed-messages - A Counter that counts the total number of messages removed from the queue.

  • movements - A Counter that counts the total number of message moves to another queue.

  • moved-messages - A Counter that counts the total number of messages moved to another queue.

  • acknowledgements - A Counter that counts the total number of performed acknowledgment operations.

  • failed-acknowledgements - A Counter that counts the total number of negative acknowledgements with failed status.

  • rejected-acknowledgements - A Counter that counts the total number of negative acknowledgements with rejected status.

  • failed-synchronizations - A Counter that counts the number of failed synchronization operations.

Queue

Valkey or Redis based unbounded Queue object for Java implements java.util.Queue interface. It wraps Valkey or Redis queue commands and extends them by implementing new methods. This object is thread-safe.

This queue lacks the reliability features of the Reliable Queue, such as message acknowledgments, visibility timeouts, delivery guarantees and many more.

It has Async, Reactive and RxJava3 interfaces.

RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();

Listeners

Redisson allows binding listeners per RQueue object. This requires the notify-keyspace-events setting to be enabled on Redis or Valkey side.

Listener class name Event description Valkey or Redis
notify-keyspace-events value
org.redisson.api.listener.TrackingListener Element created/removed/updated after read operation -
org.redisson.api.listener.ListAddListener Element created El
org.redisson.api.listener.ListRemoveListener Element removed El
org.redisson.api.ExpiredObjectListener RQueue object expired Ex
org.redisson.api.DeletedObjectListener RQueue object deleted Eg

Usage example:

RQueue<String> queue = redisson.getQueue("anyList");

int listenerId = queue.addListener(new DeletedObjectListener() {
     @Override
     public void onDeleted(String name) {
        // ...
     }
});

// ...

queue.removeListener(listenerId);

Deque

Redis or Valkey based distributed unbounded Deque object for Java implements java.util.Deque interface. It wraps Valkey or Redis deque commands and extends them by implementing new methods. This object is thread-safe.

It has Async, Reactive and RxJava3 interfaces.

RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();

Listeners

Redisson allows binding listeners per RDeque object. This requires the notify-keyspace-events setting to be enabled on Redis or Valkey side.

Listener class name Event description Valkey or Redis
notify-keyspace-events value
org.redisson.api.listener.TrackingListener Element created/removed/updated after read operation -
org.redisson.api.listener.ListAddListener Element created El
org.redisson.api.listener.ListRemoveListener Element removed El
org.redisson.api.ExpiredObjectListener RDeque object expired Ex
org.redisson.api.DeletedObjectListener RDeque object deleted Eg

Usage example:

RDeque<String> deque = redisson.getDeque("anyList");

int listenerId = deque.addListener(new DeletedObjectListener() {
     @Override
     public void onDeleted(String name) {
        // ...
     }
});

// ...

deque.removeListener(listenerId);

Blocking Queue

Redis or Valkey based distributed unbounded BlockingQueue object for Java implements java.util.concurrent.BlockingQueue interface. It wraps Valkey or Redis blocking queue commands and extends them by implementing new methods. This object is thread-safe.

This queue lacks the reliability features of the Reliable Queue, such as message acknowledgments, visibility timeouts, delivery guarantees and many more.

It has Async, Reactive and RxJava3 interfaces.

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");

queue.offer(new SomeObject());

SomeObject obj = queue.peek();
SomeObject obj = queue.poll();
SomeObject obj = queue.poll(10, TimeUnit.MINUTES);
poll, pollFromAny, pollLastAndOfferFirstTo and take methods are resubscribed automatically during re-connection to server or failover.

Blocking Deque

Java implementation of Redis or Valkey based BlockingDeque implements java.util.concurrent.BlockingDeque interface. This object is thread-safe.

It has Async, Reactive and RxJava3 interfaces.

RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);
poll, pollFromAny, pollLastAndOfferFirstTo and take methods are resubscribed automatically during re-connection to server or failover.

Delayed Queue

This object is deprecated. Superseded by RReliableQueue with delay.

Bounded Blocking Queue

This object is deprecated. Superseded by RReliableQueue with queue size limit.

Priority Queue

Java implementation of Redis or Valkey based PriorityQueue implements java.util.Queue interface. Elements are ordered according to natural order of Comparable interface or defined Comparator. This object is thread-safe.

This queue lacks the reliability features of the Reliable Queue, which implements message priorities, message acknowledgments, visibility timeouts, delivery guarantees and many more.

Use trySetComparator() method to define own Comparator.

Code example:

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityQueue<Entry> queue = redisson.getPriorityQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.poll();
// Entry [b:1]
Entry e = queue.poll();
// Entry [c:1]
Entry e = queue.poll();

Priority Deque

Java implementation of Redis or Valkey based PriorityDeque implements java.util.Deque interface. Elements are ordered according to natural order of java.lang.Comparable interface or defined java.util.Comparator. This object is thread-safe.

Use trySetComparator() method to define own Comparator.

Code example:

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityDeque<Entry> queue = redisson.getPriorityDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.pollFirst();
// Entry [c:1]
Entry e = queue.pollLast();

Priority Blocking Queue

Java implementation of Redis or Valkey based PriorityBlockingQueue similar to JDK java.util.concurrent.PriorityBlockingQueue object. Elements are ordered according to natural order of java.lang.Comparable interface or defined java.util.Comparator. This object is thread-safe.

This queue lacks the reliability features of the Reliable Queue, which implements message priorities, message acknowledgments, visibility timeouts, delivery guarantees and many more.

Use trySetComparator() method to define own java.util.Comparator.

poll, pollLastAndOfferFirstTo and take methods are resubscribed automatically during re-connection to a server or failover.

Code example:

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityBlockingQueue<Entry> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.take();

Priority Blocking Deque

Java implementation of Redis or Valkey based PriorityBlockingDeque implements java.util.concurrent.BlockingDeque interface. Elements are ordered according to natural order of java.lang.Comparable interface or defined java.util.Comparator. This object is thread-safe.

Use trySetComparator() method to define own java.util.Comparator.

poll, pollLastAndOfferFirstTo, take methods are resubscribed automatically during re-connection to Redis or Valkey server or failover.

Code example:

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityBlockingDeque<Entry> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.takeFirst();
// Entry [c:1]
Entry e = queue.takeLast();

Stream

Java implementation of Redis or Valkey based Stream object wraps Stream feature. Basically it allows to create Consumers Group which consume data added by Producers. This object is thread-safe.

RStream<String, String> stream = redisson.getStream("test");

StreamMessageId sm = stream.add(StreamAddArgs.entry("0", "0"));

stream.createGroup("testGroup");

StreamId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamId id2 = stream.add(StreamAddArgs.entry("2", "2"));

Map<StreamId, Map<String, String>> group = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());

// return entries in pending state after read group method execution
Map<StreamMessageId, Map<String, String>> pendingData = stream.pendingRange("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);

// transfer ownership of pending messages to a new consumer
List<StreamMessageId> transferedIds = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);

// mark pending entries as correctly processed
long amount = stream.ack("testGroup", id1, id2);

Code example of Async interface usage:

RStream<String, String> stream = redisson.getStream("test");

RFuture<StreamMessageId> smFuture = stream.addAsync(StreamAddArgs.entry("0", "0"));

RFuture<Void> groupFuture = stream.createGroupAsync("testGroup");

RFuture<StreamId> id1Future = stream.addAsync(StreamAddArgs.entry("1", "1"));
RFuture<StreamId> id2Future = stream.addAsync(StreamAddArgs.entry("2", "2"));

RFuture<Map<StreamId, Map<String, String>>> groupResultFuture = stream.readGroupAsync("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());

// return entries in pending state after read group method execution
RFuture<Map<StreamMessageId, Map<String, String>>> pendingDataFuture = stream.pendingRangeAsync("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);

// transfer ownership of pending messages to a new consumer
RFuture<List<StreamMessageId>> transferedIdsFuture = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);

// mark pending entries as correctly processed
RFuture<Long> amountFuture = stream.ackAsync("testGroup", id1, id2);

amountFuture.whenComplete((res, exception) -> {
    // ...
});

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RStreamReactive<String, String> stream = redisson.getStream("test");

Mono<StreamMessageId> smMono = stream.add(StreamAddArgs.entry("0", "0"));

Mono<Void> groupMono = stream.createGroup("testGroup");

Mono<StreamId> id1Mono = stream.add(StreamAddArgs.entry("1", "1"));
Mono<StreamId> id2Mono = stream.add(StreamAddArgs.entry("2", "2"));

Mono<Map<StreamId, Map<String, String>>> groupMono = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());

// return entries in pending state after read group method execution
Mono<Map<StreamMessageId, Map<String, String>>> pendingDataMono = stream.pendingRange("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);

// transfer ownership of pending messages to a new consumer
Mono<List<StreamMessageId>> transferedIdsMono = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);

// mark pending entries as correctly processed
Mono<Long> amountMono = stream.ack("testGroup", id1, id2);

amountMono.doOnNext(res -> {
   // ...
}).subscribe();

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RStreamRx<String, String> stream = redisson.getStream("test");

Single<StreamMessageId> smRx = stream.add(StreamAddArgs.entry("0", "0"));

Completable groupRx = stream.createGroup("testGroup");

Single<StreamId> id1Rx = stream.add(StreamAddArgs.entry("1", "1"));
Single<StreamId> id2Rx = stream.add(StreamAddArgs.entry("2", "2"));

Single<Map<StreamId, Map<String, String>>> groupRx = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());

// return entries in pending state after read group method execution
Single<Map<StreamMessageId, Map<String, String>>> pendingDataRx = stream.pendingRange("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);

// transfer ownership of pending messages to a new consumer
Single<List<StreamMessageId>> transferedIdsRx = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);

// mark pending entries as correctly processed
Single<Long> amountRx = stream.ack("testGroup", id1, id2);

amountRx.doOnSuccess(res -> {
   // ...
}).subscribe();

Listeners

Redisson allows binding listeners per RStream object. This requires the notify-keyspace-events setting to be enabled on Redis or Valkey side.

Listener class name Event description Valkey or Redis
notify-keyspace-events value
org.redisson.api.listener.TrackingListener Element added/removed/updated after read operation -
org.redisson.api.ExpiredObjectListener RStream object expired Ex
org.redisson.api.DeletedObjectListener RStream object deleted Eg
org.redisson.api.listener.StreamAddListener Element added Et
org.redisson.api.listener.StreamRemoveListener Element removed Et
org.redisson.api.listener.StreamCreateGroupListener Group created Et
org.redisson.api.listener.StreamRemoveGroupListener Group removed Et
org.redisson.api.listener.StreamCreateConsumerListener Consumer created Et
org.redisson.api.listener.StreamRemoveConsumerListener Consumer removed Et
org.redisson.api.listener.StreamTrimListener Stream trimmed Et

Usage example:

RStream<String, String> stream = redisson.getStream("anySet");

int listenerId = stream.addListener(new DeletedObjectListener() {
     @Override
     public void onDeleted(String name) {
        // ...
     }
});

int listenerId = stream.addListener(new StreamAddListener() {
    @Override
    public void onAdd(String name) {
        // ...
    }
});


// ...

stream.removeListener(listenerId);

Ring Buffer

Java implementation of Redis or Valkey based RingBuffer implements java.util.Queue interface. This structure evicts elements from the head if queue capacity became full. This object is thread-safe.

Should be initialized with capacity size by trySetCapacity() method before usage.

Code example:

RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");

// buffer capacity is 4 elements
buffer.trySetCapacity(4);

buffer.add(1);
buffer.add(2);
buffer.add(3);
buffer.add(4);

// buffer state is 1, 2, 3, 4

buffer.add(5);
buffer.add(6);

// buffer state is 3, 4, 5, 6

Code example of Async interface usage:

RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");

// buffer capacity is 4 elements
RFuture<Boolean> capacityFuture = buffer.trySetCapacityAsync(4);

RFuture<Boolean> addFuture = buffer.addAsync(1);
RFuture<Boolean> addFuture = buffer.addAsync(2);
RFuture<Boolean> addFuture = buffer.addAsync(3);
RFuture<Boolean> addFuture = buffer.addAsync(4);

// buffer state is 1, 2, 3, 4

RFuture<Boolean> addFuture = buffer.addAsync(5);
RFuture<Boolean> addFuture = buffer.addAsync(6);

// buffer state is 3, 4, 5, 6

addFuture.whenComplete((res, exception) -> {
    // ...
});

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RRingBufferReactive<Integer> buffer = redisson.getRingBuffer("test");

// buffer capacity is 4 elements
Mono<Boolean> capacityMono = buffer.trySetCapacity(4);

Mono<Boolean> addMono = buffer.add(1);
Mono<Boolean> addMono = buffer.add(2);
Mono<Boolean> addMono = buffer.add(3);
Mono<Boolean> addMono = buffer.add(4);

// buffer state is 1, 2, 3, 4

Mono<Boolean> addMono = buffer.add(5);
Mono<Boolean> addMono = buffer.add(6);

// buffer state is 3, 4, 5, 6

addMono.doOnNext(res -> {
   // ...
}).subscribe();

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RRingBufferRx<Integer> buffer = redisson.getRingBuffer("test");

// buffer capacity is 4 elements
Single<Boolean> capacityRx = buffer.trySetCapacity(4);

Single<Boolean> addRx = buffer.add(1);
Single<Boolean> addRx = buffer.add(2);
Single<Boolean> addRx = buffer.add(3);
Single<Boolean> addRx = buffer.add(4);

// buffer state is 1, 2, 3, 4

Single<Boolean> addRx = buffer.add(5);
Single<Boolean> addRx = buffer.add(6);

// buffer state is 3, 4, 5, 6

addRx.doOnSuccess(res -> {
   // ...
}).subscribe();

Listeners

Redisson allows binding listeners per RRingBuffer object. This requires the notify-keyspace-events setting to be enabled on Redis or Valkey side.

Listener class name Event description Valkey or Redis
notify-keyspace-events value
org.redisson.api.listener.TrackingListener Element created/removed/updated after read operation -
org.redisson.api.listener.ListAddListener Element created El
org.redisson.api.listener.ListRemoveListener Element removed El
org.redisson.api.ExpiredObjectListener RRingBuffer object expired Ex
org.redisson.api.DeletedObjectListener RRingBuffer object deleted Eg

Usage example:

RRingBuffer<String> queue = redisson.getRingBuffer("anyList");

int listenerId = queue.addListener(new DeletedObjectListener() {
     @Override
     public void onDeleted(String name) {
        // ...
     }
});

// ...

queue.removeListener(listenerId);

Transfer Queue

Java implementation of Redis or Valkey based TransferQueue implements java.util.concurrent.TransferQueue interface. Provides set of transfer methods which return only when value was successfully hand off to consumer. This object is thread-safe.

poll and take methods are resubscribed automatically during re-connection to a server or failover.

Code example:

RTransferQueue<String> queue = redisson.getTransferQueue("myCountDownLatch");

queue.transfer("data");
// or try transfer immediately
queue.tryTransfer("data");
// or try transfer up to 10 seconds
queue.tryTransfer("data", 10, TimeUnit.SECONDS);

// in other thread or JVM

queue.take();
// or
queue.poll();

Code example of Async interface usage:

RTransferQueue<String> queue = redisson.getTransferQueue("myCountDownLatch");

RFuture<Void> future = queue.transferAsync("data");
// or try transfer immediately
RFuture<Boolean> future = queue.tryTransferAsync("data");
// or try transfer up to 10 seconds
RFuture<Boolean> future = queue.tryTransferAsync("data", 10, TimeUnit.SECONDS);

// in other thread or JVM

RFuture<String> future = queue.takeAsync();
// or
RFuture<String> future = queue.pollAsync();

future.whenComplete((res, exception) -> {
    // ...
});

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RTransferQueueReactive<String> queue = redisson.getTransferQueue("myCountDownLatch");

Mono<Void> mono = queue.transfer("data");
// or try transfer immediately
Mono<Boolean> mono = queue.tryTransfer("data");
// or try transfer up to 10 seconds
Mono<Boolean> mono = queue.tryTransfer("data", 10, TimeUnit.SECONDS);

// in other thread or JVM

Mono<String> mono = queue.take();
// or
Mono<String> mono = queue.poll();

mono.doOnNext(res -> {
   // ...
}).subscribe();

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RTransferQueueRx<String> queue = redisson.getTransferQueue("myCountDownLatch");

Completable res = queue.transfer("data");
// or try transfer immediately
Single<Boolean> resRx = queue.tryTransfer("data");
// or try transfer up to 10 seconds
Single<Boolean> resRx = queue.tryTransfer("data", 10, TimeUnit.SECONDS);

// in other thread or JVM

Single<String> resRx = queue.take();
// or
Maybe<String> resRx = queue.poll();

resRx.doOnSuccess(res -> {
   // ...
}).subscribe();