Skip to content

Distributed queues

Reliable Queue

This feature is available only in Redisson PRO edition.

The Reliable Queue is a specialized distributed FIFO queue implementation built on top of Valkey or Redis that provides robust message processing and advanced queue management features. This object is fully thread-safe.

Unlike standard Valkey and Redis queues, the Reliable Queue ensures message delivery even in failure scenarios, provides acknowledgment mechanisms, and offers fine-grained control over message handling. Moreover, Valkey and Redis persistence with synchronized replication significantly increases queue reliability by maintaining multiple consistent copies of data across nodes, ensuring that messages remain available even during node failures or network disruptions.

Features

  • Queue Size Limit: Ability to set maximum capacity to prevent unbounded growth and resource exhaustion.

  • Message Visibility Timeout: Configurable time period during which a message is invisible to other consumers after being retrieved, preventing duplicate processing while allowing recovery if processing fails.

  • Message Delivery Limit: Maximum number of delivery attempts for a message before it's considered undeliverable and deleted or moved to Dead Letter Queue (DLQ) if configured.

  • Message Size Limit: Ability to restrict the size of messages to maintain performance.

  • Message Headers: Each message can have attached key-value metadata.

  • Message Expiration Timeout: Automatic removal of messages from the queue after a specified time period.

  • Message Priorities: Ability to prioritize certain messages for faster processing.

  • Message Delay: Schedule messages for future delivery with configurable delay periods.

  • Message Deduplication: Prevent duplicate message processing by object value used as ID or message payload hash within a configurable time interval.

  • Acknowledgments: Ability to acknowledge messages after successful processing, ensuring messages aren't lost during processing.

  • Negative Acknowledgments: Ability to negatively acknowledge messages that cannot be processed as failed or rejected, allowing for custom error handling strategies. Failed messages are redelivered and rejected are deleted or moved to Dead Letter Queue (DLQ) if configured.

  • Bulk Operations: Supports processing multiple messages in a single operation to improve throughput, including operations for adding, polling, removing, acknowledging, and negatively acknowledging messages.

  • Short Polling and Long Polling: Support both immediate return and wait-for-message polling.

  • Reliability and Synchronous Replication: Data redundancy across Valkey or Redis nodes with forced synchronous replication per operation to prevent message loss during node failures.

  • Dead-Letter Queue (DLQ): If a message reached delivery limit or rejected it's transferred to a Dead Letter Queue (DLQ). It's a separate instance of Reliable Queue which stores messages for later analysis or reprocessing.

  • No Periodic Tasks: The queue operates without relying on periodic background tasks, reducing system overhead and improving reliability.

Configuration

Queue-Level Settings

Queue settings can be changed at runtime. All settings are optional. If not set, default values are used.

  • deliveryLimit - Defines the maximum number of delivery attempts for a message. Once this limit is reached, the message may be moved to a dead letter queue if it's configured, otherwise it will be deleted. Can be overridden when adding a message. Default value is 10.

  • visibility - Sets the duration for which a message becomes invisible to other consumers after being polled. This prevents multiple consumers from processing the same message simultaneously. Can be overridden when pooling a message. Default value is 30 seconds.

  • timeToLive - Sets the time-to-live duration for messages in the queue. Messages will be automatically removed from the queue after this duration expires. 0 value means expiration is not applied. Can be overridden when adding a message. Default value is 0.

  • deadLetterQueueName - Sets the name of the Dead Letter Queue (DLQ) to which messages that have reached the delivery limit or have been rejected are sent. Dead letter queue can be removed by setting null value. Default value is null.

  • maxMessageSize - Sets the maximum allowed size (in bytes) for a single message in the queue. Messages exceeding this size will be rejected. 0 value means size limit is not applied. Default value is 0.

  • delay - Sets the delay duration before a message becomes available for consumption after being added to the queue.0 value means delay duration is not applied. Can be overridden when adding a message. Default value is 0.

  • maxSize - Sets the maximum number of messages that can be stored in the queue. When the queue reaches this size, add messages operation may be blocked and/or return empty result. 0 value means queue size limit is not applied. Default value is 0.

Code example of the reliable queue config definition:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue");

// or instance with a custom codec defined
RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue", new CustomCodec());


// overrides the previously set configuration
rq.setConfig(QueueConfig.defaults()
                   .deliveryLimit(4)
                   .visibilityTimeout(Duration.ofSeconds(60))
                   .timeToLive());

// applies the configuration only if no configuration has been set previously
rq.trySetConfig(QueueConfig.defaults()
                   .deliveryLimit(4)
                   .visibilityTimeout(Duration.ofSeconds(60))
                   .timeToLive());

Message-Level Settings

Message settings applied per add() or addMany() operation. All settings are optional except for payload. If optional settings are not set, defaults will be used.

  • payload - Defines the data to include in the message. The codec defined for the queue is used for data serialization and deserialization. required setting

  • deliveryLimit - Message Delivery Limit. Allows to specify the maximum number of delivery attempts for a message, after which it can be moved to Dead Letter Queue (DLQ) if configured. The delivery attempt count increases each time a message is redelivered after the visibility timeout is reached or when the message is negatively acknowledged with a failed status.
    Minimal value is 1. If not defined, the queue's deliveryLimit setting value is used. If queue's deliveryLimit setting is also not set, the default value is 10.

  • timeToLive - Message Expiration Timeout. Allows you to set a time-to-live (TTL) for messages in the queue, after which they are automatically removed if not processed.
    0 value means expiration is not applied. If not defined, the queue's timeToLive setting value is used. If queue's timeToLive setting is also not set, the default value is 0.

  • header \ headers - Message headers. Allow to attach metadata to messages without modifying the payload, enabling richer communication patterns and more sophisticated routing. Separates metadata from core message content.

  • headersCodec - Specifies the codec to be used for message headers serialization.

  • delay - Message Delay. Allows to schedule messages for future delivery, with precise timing control down to the millisecond.
    0 value means delay duration is not applied. If not defined, the queue's delay setting value is used. If queue's delay setting is also not set, the default value is 0.

  • priority - Message Priorities. Allows to assign importance levels to messages, ensuring critical messages are processed before less important ones. Ensures critical operations take precedence.
    Priority defined as a number between 0 and 9. 0 - the lowest priority level. 9 - the highest priority level. Default value is 0.

  • deduplicationById - Message Deduplication by ID. Enables deduplication based on a custom value used as ID for the specified interval. During the specified interval, messages with the same ID will be considered duplicates and won't be added to the queue.

  • deduplicationByHash - Message Deduplication by Hash. Enables deduplication based on the message payload hash for the specified interval. During the specified interval, messages with the same hash will be considered duplicates and won't be added to the queue.

  • timeout - Sets the maximum time to wait when adding messages to a full queue with a limited size. If the queue is full, the add operation will block until either:

    • Space becomes available and the message(s) are successfully added.
    • The specified timeout duration elapses.

    0 means to wait indefinitely until space becomes available. If the timeout elapses before the message(s) can be added, the add() method returns null, while the addMany() method returns an empty collection.

Operations

Adding Messages

The add() and addMany() methods each return a Message object for every added message, containing the payload, headers (identical to those of the original message) and a unique ID generated by the queue.

Code example of messages adding:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue");

MyObject data = new MyObject();

Message<MyObject> msg = rq.add(QueueAddArgs.messages(MessageArgs.payload(data)
                                                       .deliveryLimit(10)
                                                       .timeToLive(Duration.ofDays(24))
                                                       .header("key1", new MyDataValue())
                                                       .header("key2", new MyDataValue())
                                                       .delay(Duration.ofSeconds(45))
                                                       .priority(3)
                                                       .deduplicationById("myid", Duration.ofHours(23))));

String id = msg.getId();
MyObject data = msg.getPayload();
Map<String, MyDataValue> headers = msg.getHeaders();

// adding messages in a batch
List<Message<MyObject>> msgs =  rq.addMany(QueueAddArgs.messages(MessageArgs.payload(data1), 
                                                                 MessageArgs.payload(data2), 
                                                                 MessageArgs.payload(data3)));

Code example of messages adding to a queue with limited size:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue");

MyObject data = new MyObject();

// adding message with timeout in 120 seconds
Message<MyObject> msg = rq.add(QueueAddArgs.messages(MessageArgs.payload(data)
                                                       .delay(Duration.ofSeconds(45))
                                                       .deduplicationById("myid", Duration.ofHours(23)))
                                           .timeout(Duration.ofSeconds(120)));

// adding messages in a batch with timeout in 90 seconds
List<Message<MyObject>> msgs =  rq.addMany(QueueAddArgs.messages(MessageArgs.payload(data1), 
                                                                 MessageArgs.payload(data2), 
                                                                 MessageArgs.payload(data3))
                                                       .timeout(Duration.ofSeconds(90)));

Polling Messages

The poll() and pollMany() methods each return a Message object or list of Message objects. These methods are resubscribed automatically during re-connection to a Valkey or Redis node or failover.

Below are the available settings per method call:

  • acknowledgeMode - Sets the acknowledgment mode which determines how messages are acknowledged after retrieval:

    • AUTO - messages are automatically acknowledged after delivery.
    • MANUAL - messages must be explicitly acknowledged by the consumer.

    Default value is MANUAL.

  • visibility - Temporarily hides a message from other consumers after it's been received by one consumer. This prevents multiple consumers from processing the same message simultaneously. Automatically requeues messages if processing fails or crashes.
    If not defined, the queue's visibility setting value is used. If queue's visibility setting is also not set, the default value is 30 seconds.

  • timeout - Sets the maximum time to wait for messages to become available. If the queue is empty, the poll operation will block until either:

    • At least one message becomes available.
    • The specified timeout duration elapses.

    0 means to wait indefinitely for a message. If the timeout elapses without any messages becoming available, the poll() method returns null, while the pollMany() method returns an empty collection.

  • count - Sets the maximum number of messages to retrieve in a single poll operation. This parameter enables batch retrieval of messages, which can improve throughput when processing multiple messages at once. The actual number of messages returned may be less than the requested count if fewer messages are available.

  • headersCodec - Specifies the codec to be used for message headers deserialization.

Short polling

Short polling returns immediate response with or without messages.

Code example of short polling:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue");


// polling a single message
Message<MyObject> msg = rq.poll(QueuePollArgs.defaults()
                                            .visibility(Duration.ofSeconds(10))
                                            .headersCodec(new CustomCodec())
                                            .acknowledgeMode(AcknowledgeMode.MANUAL));


// polling messages in a batch
List<Message<MyObject>> msgs = rq.pollMany(QueuePollArgs.defaults()
                                            .acknowledgeMode(AcknowledgeMode.AUTO)
                                            .count(10));

Long polling

Long polling waits for messages to arrive with specified timeout setting.

Code example of long polling:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("myqueue");


// polling a single message
Message<MyObject> msg = rq.poll(QueuePollArgs.defaults()
                                            .visibility(Duration.ofSeconds(10))
                                            .headersCodec(new CustomCodec())
                                            .acknowledgeMode(AcknowledgeMode.AUTO)
                                            .timeout(Duration.ofSeconds(140)));


// polling messages in a batch
List<Message<MyObject>> msgs = rq.pollMany(QueuePollArgs.defaults()
                                            .acknowledgeMode(AcknowledgeMode.MANUAL)
                                            .timeout(Duration.ofSeconds(120)
                                            .count(10));

Acknowledging Messages

Message acknowledgment is required for messages polled with the acknowledgeMode setting set to AcknowledgeMode.MANUAL.

Acknowledgments

Acknowledgments require consumers to explicitly confirm successful processing of messages, ensuring reliable delivery and processing. Supports exactly-once processing semantics. When a message is acknowledged, it is deleted from the queue.

Code example of message acknowledgment:

RReliableQueue<Integer> rq = redisson.getReliableQueue("test");

// message id
String mid = ...
rq.acknowledge(QueueAckArgs.ids(mid));

// message ids
String[] mids = ...
// acknowledge message ids in a batch
rq.acknowledge(QueueAckArgs.ids(mids));

Negative Acknowledgments

Negative acknowledgments allow consumers to explicitly mark messages with one of the following statuses:

  • Failed - indicates that the client application failed to process the message. The message is redelivered. Allows to define the delay duration before the failed message is eligible for redelivery.

  • Rejected - indicates that the client application could process the message, but it was not accepted. The message is removed and moved to the Dead Letter Queue (DLQ) if configured.

Code example of negative message acknowledgment:

RReliableQueue<Integer> rq = redisson.getReliableQueue("myqueue");

// message id
String mid = ...
// mark message as failed with delay in 17 seconds for redelivery
rq.negativeAcknowledge(QueueNegativeAckArgs
                                    .failed(mid)
                                    .delay(Duration.ofSeconds(17)));

// mark message as rejected
rq.negativeAcknowledge(QueueNegativeAckArgs
                                    .rejected(mid));


// message ids
String[] mids = ...

// mark messages as failed in a batch with delay in 9 seconds for redelivery
rq.negativeAcknowledge(QueueNegativeAckArgs
                                    .failed(mids)
                                    .delay(Duration.ofSeconds(9)));

// mark messages as rejected in a batch
rq.negativeAcknowledge(QueueNegativeAckArgs
                                    .rejected(mids));

Inspecting and Managing Messages

Messages can be inspected, removed or moved to another queue if necessary.

  • listAll() method returns all messages in the queue, ready to be retrieved by the poll() command. Note that this method fetches all messages at once and can be time consuming for queues with a large number of messages.

  • move() method moves messages from the queue to the end of the destination queue.

  • remove() method removes messages from the queue.

Code example of queue inspection:

RReliableQueue<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// load all messages in the queue.
List<Message<MyObject>> msgs = dlq.listAll();

Code example of messages transition:

RReliableQueue<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// move messages by id to the end of the destination queue
// and get the number of messages moved
int movedCount = dlq.move(QueueMoveArgs.move()
                                        .ids(id1, id2, id3)
                                        .destination("myqueue"));

// move the first 10 messages from beginning of the queue
// to the end of the destination queue
// and get the number of messages moved
int movedCount = dlq.move(QueueMoveArgs.move()
                                        .count(10)
                                        .destination("myqueue"));

Code example of messages removal:

RReliableQueue<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// remove a single message
// and get true if the message successfully removed
boolean removed = dlq.remove(QueueRemoveArgs.ids(id1));

// remove messages in a batch
// and get the number of messages successfully removed
int count = dlq.removeMany(QueueRemoveArgs.ids(id1));

Dead-Letter Queue (DLQ)

A Dead-Letter Queue (DLQ) is a specialized message queue used to store messages that cannot be processed or delivered due to errors or other issues. Instead of losing or endlessly retrying problematic messages, they are moved to the DLQ for later inspection and handling. This isolation mechanism significantly improves system reliability and prevents potential downtime, allowing applications to maintain continuous operations despite message processing failures.

Messages may be routed to a DLQ for a number of reasons:

  • Message delivery attempts exceeds maximum delivery limit.
  • Message was negatively acknowledged with reject status.

DLQ object has the same capabilities as Reliable Queue object. To start using it, define it in the queue configuration using QueueConfig object. This action registers the queue as a source queue for the DLQ.

Code example of DLQ definition:

// source queue
RReliableQueue<Integer> rq = redisson.getReliableQueue("myqueue");

// defining dead-letter queue
rq.setConfig(QueueConfig.defaults()
                        .deadLetterQueueName("myqueue-dlq"));

DLQ source queue names can be listed using getDeadLetterQueueSources() method.

Code example of DLQ source queue names retrieval:

// dead-letter queue
RReliableQueue<MyObject> dlq = redisson.getReliableQueue("myqueue-dlq");

// returns 'myqueue'
Set<String> sourceQueueNames = dlq.getDeadLetterQueueSources();

Reliability and Synchronous Replication

The synchronization mechanism allows queue modification operations to be propagated to replica nodes in a controlled manner, ensuring data consistency across Valkey or Redis cluster. Additionally, Valkey and Redis persistence options, such as append-only files (AOF), and RDB snapshots, significantly increase queue reliability by preventing data loss during server restarts or failures, allowing for recovery of queued messages that would otherwise be lost when using only in-memory storage. This persistence capability transforms Valkey and Redis from a purely volatile memory store into a more robust message broker suitable for applications where message delivery guarantees are critical. This is particularly important for mission-critical applications where data loss is unacceptable.

Each queue modification operation can be configured with specific synchronization parameters:

  • syncMode - Sets the synchronization mode to be used for current operation. Default value is AUTO.

    Three synchronization strategies are available:

    • AUTO - Ensures data durability by blocking until write operations are confirmed as persisted to the memory and the Append-Only File (AOF) on the primary Valkey or Redis node and replicas if the AOF persistence feature is enabled. If AOF persistence is unavailable, falls back to blocking until replica nodes acknowledge that write operations have been applied to memory. If neither durability mechanism is available, proceeds without synchronization guarantees.
    • ACK - Ensures data durability by blocking until replica nodes acknowledge that write operations have been applied to memory.
    • ACK_AOF - Ensures data durability by blocking until write operations are confirmed as persisted to the memory and the Append-Only File (AOF) on the primary Valkey or Redis node and replicas. Requires Redis 7.2.0+ or any Valkey version.
  • syncFailureMode - Sets the behavior when synchronization with replica nodes fails. Default value is LOG_WARNING.

    Two failure handling modes are available:

    • THROW_EXCEPTION - Throw an exception to the caller. This mode is useful in scenarios where synchronization failures should be immediately visible and handled by the application code.
    • LOG_WARNING - Log a warning message but continue execution. This mode is suitable for non-critical synchronization operations where the application can continue functioning despite synchronization issues.
  • syncTimeout - Sets the timeout duration for synchronization of the current operation. Defines how long the caller will wait for acknowledgment from replica nodes. Default value is 1 second.

Code example of synchronization parameters usage:

RReliableQueue<MyObject> rq = redisson.getReliableQueue("test");

rq.add(QueueAddArgs.messages(MessageArgs.payload(data))
                        .syncMode(SyncMode.ACK_AOF)
                        .syncTimeout(Duration.ofSeconds(15))
                        .syncFailureMode(SyncFailureMode.LOG_WARNING));

rq.poll(QueuePollArgs.defaults()
                        .syncMode(SyncMode.ACK)
                        .syncTimeout(Duration.ofSeconds(10))
                        .syncFailureMode(SyncFailureMode.THROW_EXCEPTION));

rq.acknowledge(QueueAckArgs.ids(msg.getId())
                            .syncMode(SyncMode.AUTO)
                            .syncTimeout(Duration.ofSeconds(20)));

rq.negativeAcknowledge(QueueNegativeAckArgs.rejected(msg.getId())
                            .syncMode(SyncMode.ACK)
                            .syncTimeout(Duration.ofSeconds(15)));

Queue

Valkey or Redis based unbounded Queue object for Java implements java.util.Queue interface. It wraps Valkey or Redis queue commands and extends them by implementing new methods. This object is thread-safe.

This queue lacks the reliability features of the Reliable Queue, such as message acknowledgments, visibility timeouts, delivery guarantees and many more.

It has Async, Reactive and RxJava3 interfaces.

RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();

Listeners

Redisson allows binding listeners per RQueue object. This requires the notify-keyspace-events setting to be enabled on Redis or Valkey side.

Listener class name Event description Valkey or Redis
notify-keyspace-events value
org.redisson.api.listener.TrackingListener Element created/removed/updated after read operation -
org.redisson.api.listener.ListAddListener Element created El
org.redisson.api.listener.ListRemoveListener Element removed El
org.redisson.api.ExpiredObjectListener RQueue object expired Ex
org.redisson.api.DeletedObjectListener RQueue object deleted Eg

Usage example:

RQueue<String> queue = redisson.getQueue("anyList");

int listenerId = queue.addListener(new DeletedObjectListener() {
     @Override
     public void onDeleted(String name) {
        // ...
     }
});

// ...

queue.removeListener(listenerId);

Deque

Redis or Valkey based distributed unbounded Deque object for Java implements java.util.Deque interface. It wraps Valkey or Redis deque commands and extends them by implementing new methods. This object is thread-safe.

It has Async, Reactive and RxJava3 interfaces.

RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();

Listeners

Redisson allows binding listeners per RDeque object. This requires the notify-keyspace-events setting to be enabled on Redis or Valkey side.

Listener class name Event description Valkey or Redis
notify-keyspace-events value
org.redisson.api.listener.TrackingListener Element created/removed/updated after read operation -
org.redisson.api.listener.ListAddListener Element created El
org.redisson.api.listener.ListRemoveListener Element removed El
org.redisson.api.ExpiredObjectListener RDeque object expired Ex
org.redisson.api.DeletedObjectListener RDeque object deleted Eg

Usage example:

RDeque<String> deque = redisson.getDeque("anyList");

int listenerId = deque.addListener(new DeletedObjectListener() {
     @Override
     public void onDeleted(String name) {
        // ...
     }
});

// ...

deque.removeListener(listenerId);

Blocking Queue

Redis or Valkey based distributed unbounded BlockingQueue object for Java implements java.util.concurrent.BlockingQueue interface. It wraps Valkey or Redis blocking queue commands and extends them by implementing new methods. This object is thread-safe.

This queue lacks the reliability features of the Reliable Queue, such as message acknowledgments, visibility timeouts, delivery guarantees and many more.

It has Async, Reactive and RxJava3 interfaces.

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");

queue.offer(new SomeObject());

SomeObject obj = queue.peek();
SomeObject obj = queue.poll();
SomeObject obj = queue.poll(10, TimeUnit.MINUTES);
poll, pollFromAny, pollLastAndOfferFirstTo and take methods are resubscribed automatically during re-connection to server or failover.

Blocking Deque

Java implementation of Redis or Valkey based BlockingDeque implements java.util.concurrent.BlockingDeque interface. This object is thread-safe.

It has Async, Reactive and RxJava3 interfaces.

RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);
poll, pollFromAny, pollLastAndOfferFirstTo and take methods are resubscribed automatically during re-connection to server or failover.

Delayed Queue

This object is deprecated. Use RReliableQueue with delay.

Bounded Blocking Queue

This object is deprecated. Use RReliableQueue with queue size limit.

Priority Queue

Java implementation of Redis or Valkey based PriorityQueue implements java.util.Queue interface. Elements are ordered according to natural order of Comparable interface or defined Comparator. This object is thread-safe.

This queue lacks the reliability features of the Reliable Queue, which implements message priorities, message acknowledgments, visibility timeouts, delivery guarantees and many more.

Use trySetComparator() method to define own Comparator.

Code example:

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityQueue<Entry> queue = redisson.getPriorityQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.poll();
// Entry [b:1]
Entry e = queue.poll();
// Entry [c:1]
Entry e = queue.poll();

Priority Deque

Java implementation of Redis or Valkey based PriorityDeque implements java.util.Deque interface. Elements are ordered according to natural order of java.lang.Comparable interface or defined java.util.Comparator. This object is thread-safe.

Use trySetComparator() method to define own Comparator.

Code example:

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityDeque<Entry> queue = redisson.getPriorityDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.pollFirst();
// Entry [c:1]
Entry e = queue.pollLast();

Priority Blocking Queue

Java implementation of Redis or Valkey based PriorityBlockingQueue similar to JDK java.util.concurrent.PriorityBlockingQueue object. Elements are ordered according to natural order of java.lang.Comparable interface or defined java.util.Comparator. This object is thread-safe.

This queue lacks the reliability features of the Reliable Queue, which implements message priorities, message acknowledgments, visibility timeouts, delivery guarantees and many more.

Use trySetComparator() method to define own java.util.Comparator.

poll, pollLastAndOfferFirstTo and take methods are resubscribed automatically during re-connection to a server or failover.

Code example:

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityBlockingQueue<Entry> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.take();

Priority Blocking Deque

Java implementation of Redis or Valkey based PriorityBlockingDeque implements java.util.concurrent.BlockingDeque interface. Elements are ordered according to natural order of java.lang.Comparable interface or defined java.util.Comparator. This object is thread-safe.

Use trySetComparator() method to define own java.util.Comparator.

poll, pollLastAndOfferFirstTo, take methods are resubscribed automatically during re-connection to Redis or Valkey server or failover.

Code example:

public class Entry implements Comparable<Entry>, Serializable {

    private String key;
    private Integer value;

    public Entry(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public int compareTo(Entry o) {
        return key.compareTo(o.key);
    }

}

RPriorityBlockingDeque<Entry> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.add(new Entry("b", 1));
queue.add(new Entry("c", 1));
queue.add(new Entry("a", 1));

// Entry [a:1]
Entry e = queue.takeFirst();
// Entry [c:1]
Entry e = queue.takeLast();

Stream

Java implementation of Redis or Valkey based Stream object wraps Stream feature. Basically it allows to create Consumers Group which consume data added by Producers. This object is thread-safe.

RStream<String, String> stream = redisson.getStream("test");

StreamMessageId sm = stream.add(StreamAddArgs.entry("0", "0"));

stream.createGroup("testGroup");

StreamId id1 = stream.add(StreamAddArgs.entry("1", "1"));
StreamId id2 = stream.add(StreamAddArgs.entry("2", "2"));

Map<StreamId, Map<String, String>> group = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());

// return entries in pending state after read group method execution
Map<StreamMessageId, Map<String, String>> pendingData = stream.pendingRange("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);

// transfer ownership of pending messages to a new consumer
List<StreamMessageId> transferedIds = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);

// mark pending entries as correctly processed
long amount = stream.ack("testGroup", id1, id2);

Code example of Async interface usage:

RStream<String, String> stream = redisson.getStream("test");

RFuture<StreamMessageId> smFuture = stream.addAsync(StreamAddArgs.entry("0", "0"));

RFuture<Void> groupFuture = stream.createGroupAsync("testGroup");

RFuture<StreamId> id1Future = stream.addAsync(StreamAddArgs.entry("1", "1"));
RFuture<StreamId> id2Future = stream.addAsync(StreamAddArgs.entry("2", "2"));

RFuture<Map<StreamId, Map<String, String>>> groupResultFuture = stream.readGroupAsync("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());

// return entries in pending state after read group method execution
RFuture<Map<StreamMessageId, Map<String, String>>> pendingDataFuture = stream.pendingRangeAsync("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);

// transfer ownership of pending messages to a new consumer
RFuture<List<StreamMessageId>> transferedIdsFuture = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);

// mark pending entries as correctly processed
RFuture<Long> amountFuture = stream.ackAsync("testGroup", id1, id2);

amountFuture.whenComplete((res, exception) -> {
    // ...
});

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RStreamReactive<String, String> stream = redisson.getStream("test");

Mono<StreamMessageId> smMono = stream.add(StreamAddArgs.entry("0", "0"));

Mono<Void> groupMono = stream.createGroup("testGroup");

Mono<StreamId> id1Mono = stream.add(StreamAddArgs.entry("1", "1"));
Mono<StreamId> id2Mono = stream.add(StreamAddArgs.entry("2", "2"));

Mono<Map<StreamId, Map<String, String>>> groupMono = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());

// return entries in pending state after read group method execution
Mono<Map<StreamMessageId, Map<String, String>>> pendingDataMono = stream.pendingRange("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);

// transfer ownership of pending messages to a new consumer
Mono<List<StreamMessageId>> transferedIdsMono = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);

// mark pending entries as correctly processed
Mono<Long> amountMono = stream.ack("testGroup", id1, id2);

amountMono.doOnNext(res -> {
   // ...
}).subscribe();

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RStreamRx<String, String> stream = redisson.getStream("test");

Single<StreamMessageId> smRx = stream.add(StreamAddArgs.entry("0", "0"));

Completable groupRx = stream.createGroup("testGroup");

Single<StreamId> id1Rx = stream.add(StreamAddArgs.entry("1", "1"));
Single<StreamId> id2Rx = stream.add(StreamAddArgs.entry("2", "2"));

Single<Map<StreamId, Map<String, String>>> groupRx = stream.readGroup("testGroup", "consumer1", StreamReadGroupArgs.neverDelivered());

// return entries in pending state after read group method execution
Single<Map<StreamMessageId, Map<String, String>>> pendingDataRx = stream.pendingRange("testGroup", "consumer1", StreamMessageId.MIN, StreamMessageId.MAX, 100);

// transfer ownership of pending messages to a new consumer
Single<List<StreamMessageId>> transferedIdsRx = stream.fastClaim("testGroup", "consumer2", 1, TimeUnit.MILLISECONDS, id1, id2);

// mark pending entries as correctly processed
Single<Long> amountRx = stream.ack("testGroup", id1, id2);

amountRx.doOnSuccess(res -> {
   // ...
}).subscribe();

Listeners

Redisson allows binding listeners per RStream object. This requires the notify-keyspace-events setting to be enabled on Redis or Valkey side.

Listener class name Event description Valkey or Redis
notify-keyspace-events value
org.redisson.api.listener.TrackingListener Element added/removed/updated after read operation -
org.redisson.api.ExpiredObjectListener RStream object expired Ex
org.redisson.api.DeletedObjectListener RStream object deleted Eg
org.redisson.api.listener.StreamAddListener Element added Et
org.redisson.api.listener.StreamRemoveListener Element removed Et
org.redisson.api.listener.StreamCreateGroupListener Group created Et
org.redisson.api.listener.StreamRemoveGroupListener Group removed Et
org.redisson.api.listener.StreamCreateConsumerListener Consumer created Et
org.redisson.api.listener.StreamRemoveConsumerListener Consumer removed Et
org.redisson.api.listener.StreamTrimListener Stream trimmed Et

Usage example:

RStream<String, String> stream = redisson.getStream("anySet");

int listenerId = stream.addListener(new DeletedObjectListener() {
     @Override
     public void onDeleted(String name) {
        // ...
     }
});

int listenerId = stream.addListener(new StreamAddListener() {
    @Override
    public void onAdd(String name) {
        // ...
    }
});


// ...

stream.removeListener(listenerId);

Ring Buffer

Java implementation of Redis or Valkey based RingBuffer implements java.util.Queue interface. This structure evicts elements from the head if queue capacity became full. This object is thread-safe.

Should be initialized with capacity size by trySetCapacity() method before usage.

Code example:

RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");

// buffer capacity is 4 elements
buffer.trySetCapacity(4);

buffer.add(1);
buffer.add(2);
buffer.add(3);
buffer.add(4);

// buffer state is 1, 2, 3, 4

buffer.add(5);
buffer.add(6);

// buffer state is 3, 4, 5, 6

Code example of Async interface usage:

RRingBuffer<Integer> buffer = redisson.getRingBuffer("test");

// buffer capacity is 4 elements
RFuture<Boolean> capacityFuture = buffer.trySetCapacityAsync(4);

RFuture<Boolean> addFuture = buffer.addAsync(1);
RFuture<Boolean> addFuture = buffer.addAsync(2);
RFuture<Boolean> addFuture = buffer.addAsync(3);
RFuture<Boolean> addFuture = buffer.addAsync(4);

// buffer state is 1, 2, 3, 4

RFuture<Boolean> addFuture = buffer.addAsync(5);
RFuture<Boolean> addFuture = buffer.addAsync(6);

// buffer state is 3, 4, 5, 6

addFuture.whenComplete((res, exception) -> {
    // ...
});

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RRingBufferReactive<Integer> buffer = redisson.getRingBuffer("test");

// buffer capacity is 4 elements
Mono<Boolean> capacityMono = buffer.trySetCapacity(4);

Mono<Boolean> addMono = buffer.add(1);
Mono<Boolean> addMono = buffer.add(2);
Mono<Boolean> addMono = buffer.add(3);
Mono<Boolean> addMono = buffer.add(4);

// buffer state is 1, 2, 3, 4

Mono<Boolean> addMono = buffer.add(5);
Mono<Boolean> addMono = buffer.add(6);

// buffer state is 3, 4, 5, 6

addMono.doOnNext(res -> {
   // ...
}).subscribe();

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RRingBufferRx<Integer> buffer = redisson.getRingBuffer("test");

// buffer capacity is 4 elements
Single<Boolean> capacityRx = buffer.trySetCapacity(4);

Single<Boolean> addRx = buffer.add(1);
Single<Boolean> addRx = buffer.add(2);
Single<Boolean> addRx = buffer.add(3);
Single<Boolean> addRx = buffer.add(4);

// buffer state is 1, 2, 3, 4

Single<Boolean> addRx = buffer.add(5);
Single<Boolean> addRx = buffer.add(6);

// buffer state is 3, 4, 5, 6

addRx.doOnSuccess(res -> {
   // ...
}).subscribe();

Listeners

Redisson allows binding listeners per RRingBuffer object. This requires the notify-keyspace-events setting to be enabled on Redis or Valkey side.

Listener class name Event description Valkey or Redis
notify-keyspace-events value
org.redisson.api.listener.TrackingListener Element created/removed/updated after read operation -
org.redisson.api.listener.ListAddListener Element created El
org.redisson.api.listener.ListRemoveListener Element removed El
org.redisson.api.ExpiredObjectListener RRingBuffer object expired Ex
org.redisson.api.DeletedObjectListener RRingBuffer object deleted Eg

Usage example:

RRingBuffer<String> queue = redisson.getRingBuffer("anyList");

int listenerId = queue.addListener(new DeletedObjectListener() {
     @Override
     public void onDeleted(String name) {
        // ...
     }
});

// ...

queue.removeListener(listenerId);

Transfer Queue

Java implementation of Redis or Valkey based TransferQueue implements java.util.concurrent.TransferQueue interface. Provides set of transfer methods which return only when value was successfully hand off to consumer. This object is thread-safe.

poll and take methods are resubscribed automatically during re-connection to a server or failover.

Code example:

RTransferQueue<String> queue = redisson.getTransferQueue("myCountDownLatch");

queue.transfer("data");
// or try transfer immediately
queue.tryTransfer("data");
// or try transfer up to 10 seconds
queue.tryTransfer("data", 10, TimeUnit.SECONDS);

// in other thread or JVM

queue.take();
// or
queue.poll();

Code example of Async interface usage:

RTransferQueue<String> queue = redisson.getTransferQueue("myCountDownLatch");

RFuture<Void> future = queue.transferAsync("data");
// or try transfer immediately
RFuture<Boolean> future = queue.tryTransferAsync("data");
// or try transfer up to 10 seconds
RFuture<Boolean> future = queue.tryTransferAsync("data", 10, TimeUnit.SECONDS);

// in other thread or JVM

RFuture<String> future = queue.takeAsync();
// or
RFuture<String> future = queue.pollAsync();

future.whenComplete((res, exception) -> {
    // ...
});

Code example of Reactive interface usage:

RedissonReactiveClient redisson = redissonClient.reactive();
RTransferQueueReactive<String> queue = redisson.getTransferQueue("myCountDownLatch");

Mono<Void> mono = queue.transfer("data");
// or try transfer immediately
Mono<Boolean> mono = queue.tryTransfer("data");
// or try transfer up to 10 seconds
Mono<Boolean> mono = queue.tryTransfer("data", 10, TimeUnit.SECONDS);

// in other thread or JVM

Mono<String> mono = queue.take();
// or
Mono<String> mono = queue.poll();

mono.doOnNext(res -> {
   // ...
}).subscribe();

Code example of RxJava3 interface usage:

RedissonRxClient redisson = redissonClient.rxJava();
RTransferQueueRx<String> queue = redisson.getTransferQueue("myCountDownLatch");

Completable res = queue.transfer("data");
// or try transfer immediately
Single<Boolean> resRx = queue.tryTransfer("data");
// or try transfer up to 10 seconds
Single<Boolean> resRx = queue.tryTransfer("data", 10, TimeUnit.SECONDS);

// in other thread or JVM

Single<String> resRx = queue.take();
// or
Maybe<String> resRx = queue.poll();

resRx.doOnSuccess(res -> {
   // ...
}).subscribe();