Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
0ff8317
Introduce keyless command concept
uglide Jun 6, 2025
e22dd9a
Introduce request/response policies to CommandFlagsRegistry and Stati…
uglide Jan 16, 2026
2490574
Update CommandFlagsRegistryGenerator to support request/response poli…
uglide Jan 16, 2026
52715dc
Drop broadcastCommand and executeKeylessCommand from the CommandExecu…
uglide Jan 16, 2026
2358bde
Add isKeyless() to CommandArguments
uglide Jan 16, 2026
2d3f887
Add support for MULTI_SHARD commands
uglide Jan 16, 2026
473c602
Get rid of ClusterCommandArguments
uglide Jan 21, 2026
7d686cf
Fix ClusterReplyAggregator
uglide Jan 21, 2026
9930bd8
Expose addHashSlotKey method for special cases like KEYS command
uglide Jan 21, 2026
48090bd
Allow manual overrides in CommandFlagsRegistryGenerator
uglide Jan 21, 2026
3c7f46e
Fix tests
uglide Jan 21, 2026
d081eaa
Add missing exception class
uglide Jan 21, 2026
5872e66
Clean up after dropping FT.Aggregate iterator
uglide Jan 21, 2026
62becdf
Fix formatting
uglide Jan 21, 2026
120ad89
Add missing ClusterAggregationException
uglide Jan 21, 2026
0989e54
Fix unit tests
uglide Jan 21, 2026
07b33d1
Fix bugs with hash slots
uglide Jan 22, 2026
f95fa8d
Add support for arrays in AND&OR aggregations
uglide Jan 22, 2026
32345e0
Add support for DBSIZE and add more tests
uglide Jan 22, 2026
157c0cf
Optimize hash slot calculation and get rid of processKey() methods
uglide Jan 22, 2026
d39a4de
Deprecate sendCommand() and sendBlockingCommand() as unsafe in cluste…
uglide Jan 22, 2026
bc5e40f
Update tests to reflect the changes
uglide Jan 22, 2026
da632c4
Resolve merge problems
uglide Feb 27, 2026
fec9bc9
Address feedback for CommandFlagsRegistryGenerator
uglide Feb 27, 2026
37a4f23
Reformat
uglide Feb 27, 2026
b7392d1
Add missing import to CommandFlagsRegistryGenerator
uglide Feb 27, 2026
d1127f1
Make executeKeylessCommand private
uglide Feb 27, 2026
1399091
Extract sendCommand methods deprecation
uglide Feb 27, 2026
2b61f3b
Improve CommandFlagsRegistryGenerator
uglide Mar 2, 2026
f2d246c
Preserve the initial order when executing multi-shard commands
uglide Mar 2, 2026
329893a
Add Connection resolvers
uglide Mar 3, 2026
26d0a22
Refactor broadcastCommand and executeMultiShardCommand to use common …
uglide Mar 3, 2026
6c2da5e
Reformat
uglide Mar 3, 2026
0d8d8d4
Update tests
uglide Mar 3, 2026
6284796
Reformat
uglide Mar 3, 2026
bf3a503
Fix bug in CommandArguments
uglide Mar 3, 2026
953ee17
Fix bug with SingleConnectionResolver
uglide Mar 3, 2026
7a63f96
Fix bug with roundRobinResolver
uglide Mar 3, 2026
a147c37
Take into account keyPreProcessor in groupArgumentsByKeyValueHashSlot…
uglide Mar 3, 2026
6bed397
Replace outdated test in ClusterBinaryValuesCommandsTest
uglide Mar 4, 2026
5f783bf
Merge branch 'master' into im/request-response-policy-support
uglide Mar 4, 2026
b15c170
Throw redirection exceptions immediately when followRedirections is f…
uglide Mar 5, 2026
3c918a2
Add support for JedisByteHashMap and JedisByteMap in default aggregation
uglide Mar 5, 2026
cdd4409
Reformat
uglide Mar 5, 2026
8fca55d
Mutate exising collection in aggregateDefault()
uglide Mar 5, 2026
519739c
Address review suggestions and clean up code
uglide Mar 5, 2026
80f3ca3
Make ClusterReplyAggregator pp
uglide Mar 5, 2026
c82daa5
Fix tests
uglide Mar 5, 2026
cc80f3b
Merge branch 'master' into im/request-response-policy-support
uglide Mar 5, 2026
e5b6bfb
Fix SinceRedisVersion for clusterSlotStatsAggregation test
uglide Mar 5, 2026
339d81f
Merge remote-tracking branch 'origin/im/request-response-policy-suppo…
uglide Mar 5, 2026
594fc3c
Fix aggregation for WAITAOF command and improve exception message
uglide Mar 5, 2026
a17d4f2
Merge branch 'master' into im/request-response-policy-support
ggivo Mar 16, 2026
2b9a22b
fix ClientAuthRedisClusterClientIT.java:[43,13] cannot find symbol
ggivo Mar 16, 2026
27c37ad
getKeys now returns unmodifiable list
ggivo Mar 16, 2026
a05f1bc
CommnadArguments.addHashSlotKey - package private & typesafe
ggivo Mar 16, 2026
23f7def
[req/resp] Reject commands with ALL_NODES,ALL_SHARDS,MULTI_SHARD requ…
ggivo Mar 18, 2026
3d1da01
[req/resp] Fix aggregating unmodifiable collection failiure (#4465)
ggivo Mar 19, 2026
af1899f
Merge branch 'master' into im/request-response-policy-support
ggivo Mar 19, 2026
57816a4
override INFO & FUNCITON_STATS request policy for cluster to DEFAULT
ggivo Mar 20, 2026
3d286f8
formating
ggivo Mar 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
branches:
- master
- '[0-9].*'
- 'topic/**'
pull_request:
branches:
- master
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-on-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ on:
- master
- '[0-9].*'
- 'feature/**'
- 'topic/**'
pull_request:
branches:
- master
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@
<include>**/resps/LibraryInfoTest.java</include>
<include>**/*Matchers.java</include>
<include>**/*TestUtil.java</include>
<include>**/executors/aggregators/*.java</include>
<include>**/*MapMatcher.java</include>
</includes>
</configuration>
<executions>
Expand Down
40 changes: 0 additions & 40 deletions src/main/java/redis/clients/jedis/ClusterCommandArguments.java

This file was deleted.

588 changes: 545 additions & 43 deletions src/main/java/redis/clients/jedis/ClusterCommandObjects.java

Large diffs are not rendered by default.

67 changes: 66 additions & 1 deletion src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,58 @@
import java.time.Duration;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.IOUtils;

/**
* Pipeline implementation for Redis Cluster mode.
* <p>
* ClusterPipeline allows batching multiple commands for efficient execution in a Redis Cluster
* environment. Commands are automatically routed to the appropriate cluster nodes based on
* key hash slots.
* </p>
* <p>
* <strong>Important Limitations:</strong>
* </p>
* <ul>
* <li><strong>Single-node commands only:</strong> Only commands that can be routed to a single
* node are supported. Commands requiring execution on multiple nodes (ALL_SHARDS, MULTI_SHARD,
* ALL_NODES, or SPECIAL request policies) will throw {@link UnsupportedOperationException}.</li>
* <li><strong>Examples of unsupported commands:</strong>
* <ul>
* <li>{@code KEYS} - requires execution on all master shards</li>
* <li>{@code MGET} with keys in different slots - requires execution on multiple shards</li>
* <li>{@code SCRIPT LOAD} - requires execution on all nodes</li>
* </ul>
* </li>
* <li>For multi-node commands, use the non-pipelined mode
* of {@link RedisClusterClient} instead.</li>
* </ul>
* <p>
* <strong> Usage Pattern:</strong>
* </p>
* <pre>{@code
* try (RedisCluster cluster = new RedisCluster(nodes, config)) {
* // For single-node commands, use pipelined mode
* try (ClusterPipeline pipeline = cluster.pipelined()) {
* Response<String> r1 = pipeline.set("key1", "value1");
* Response<String> r2 = pipeline.get("key1");
* pipeline.sync();
*
* System.out.println(r1.get()); // "OK"
* System.out.println(r2.get()); // "value1"
* }
*
* // For multi-node commands, use non-pipelined mode
* Set<String> allKeys = cluster.keys("*"); // Executes on all master shards
* List<String> values = cluster.mget("key1", "key2", "key3"); // Cross-slot keys
* }
* }</pre>
*
* @see MultiNodePipelineBase
* @see redis.clients.jedis.RedisClusterClient
*/
public class ClusterPipeline extends MultiNodePipelineBase {

private final ClusterConnectionProvider provider;
Expand Down Expand Up @@ -40,6 +89,12 @@ public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects
this.provider = provider;
}

ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects,
CommandFlagsRegistry commandFlagsRegistry) {
super(commandObjects, commandFlagsRegistry);
this.provider = provider;
}

private static ClusterCommandObjects createClusterCommandObjects(RedisProtocol protocol) {
ClusterCommandObjects cco = new ClusterCommandObjects();
if (protocol == RedisProtocol.RESP3) cco.setProtocol(protocol);
Expand All @@ -57,7 +112,17 @@ public void close() {

@Override
protected HostAndPort getNodeKey(CommandArguments args) {
return provider.getNode(((ClusterCommandArguments) args).getCommandHashSlot());
Set<Integer> slots = args.getKeyHashSlots();

if (slots.size() > 1) {
throw new JedisClusterOperationException("Cannot get NodeKey for command with multiple hash slots");
}

if (slots.isEmpty()) {
return null; // Let getConnection(null) handle it by using a random node
}

return provider.getNode(slots.iterator().next());
}

@Override
Expand Down
127 changes: 89 additions & 38 deletions src/main/java/redis/clients/jedis/CommandArguments.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package redis.clients.jedis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.*;

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.annots.Internal;
Expand All @@ -14,13 +9,30 @@
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.params.IParams;
import redis.clients.jedis.search.RediSearchUtil;
import redis.clients.jedis.util.JedisClusterCRC16;

public class CommandArguments implements Iterable<Rawable> {

/**
* Default initial capacity for the keys list. Most Redis commands have 1-3 keys,
* so a small initial capacity avoids reallocations for common cases.
*/
private static final int DEFAULT_KEYS_CAPACITY = 4;

private CommandKeyArgumentPreProcessor keyPreProc = null;
private final ArrayList<Rawable> args;

private List<Object> keys;
/**
* Pre-allocated list for storing keys. Using ArrayList directly avoids the
* memory reallocation overhead of transitioning from emptyList -> singletonList -> ArrayList.
*/
private final ArrayList<Object> keys;

/**
* Cached hash slots computed from keys. Null indicates the cache is invalid
* and needs to be recomputed. The cache is invalidated when keys are added.
*/
private Set<Integer> cachedHashSlots;

private boolean blocking;

Expand All @@ -32,7 +44,8 @@ public CommandArguments(ProtocolCommand command) {
args = new ArrayList<>();
args.add(command);

keys = Collections.emptyList();
keys = new ArrayList<>(DEFAULT_KEYS_CAPACITY);
cachedHashSlots = null;
}

public ProtocolCommand getCommand() {
Expand Down Expand Up @@ -120,36 +133,36 @@ public CommandArguments key(Object key) {

if (key instanceof Rawable) {
Rawable raw = (Rawable) key;
processKey(raw.getRaw());
args.add(raw);
// Extract raw bytes for hash slot computation to avoid ClassCastException in getKeyHashSlots()
addHashSlotKey(raw.getRaw());
Comment thread
uglide marked this conversation as resolved.
Comment thread
ggivo marked this conversation as resolved.
} else if (key instanceof byte[]) {
byte[] raw = (byte[]) key;
processKey(raw);
args.add(RawableFactory.from(raw));
addHashSlotKey(raw);
} else if (key instanceof String) {
String raw = (String) key;
processKey(raw);
args.add(RawableFactory.from(raw));
addHashSlotKey(raw);
} else {
throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
}

addKeyInKeys(key);
return this;
}

final CommandArguments addHashSlotKey(String key) {
keys.add(key);
// Invalidate cached hash slots since keys have changed
cachedHashSlots = null;
return this;
}

private void addKeyInKeys(Object key) {
if (keys.isEmpty()) {
keys = Collections.singletonList(key);
} else if (keys.size() == 1) {
List oldKeys = keys;
keys = new ArrayList();
keys.addAll(oldKeys);
keys.add(key);
} else {
keys.add(key);
}
final CommandArguments addHashSlotKey(byte[] key) {
keys.add(key);
// Invalidate cached hash slots since keys have changed
cachedHashSlots = null;
return this;
}

public final CommandArguments keys(Object... keys) {
Expand All @@ -167,26 +180,16 @@ public final CommandArguments addParams(IParams params) {
return this;
}

protected CommandArguments processKey(byte[] key) {
// do nothing
return this;
}

protected final CommandArguments processKeys(byte[]... keys) {
protected final CommandArguments addHashSlotKeys(byte[]... keys) {
for (byte[] key : keys) {
processKey(key);
addHashSlotKey(key);
}
return this;
}

protected CommandArguments processKey(String key) {
// do nothing
return this;
}

protected final CommandArguments processKeys(String... keys) {
protected final CommandArguments addHashSlotKeys(String... keys) {
for (String key : keys) {
processKey(key);
addHashSlotKey(key);
}
return this;
}
Expand All @@ -210,9 +213,57 @@ public Iterator<Rawable> iterator() {
return args.iterator();
}

/**
* Returns the keys used in this command.
* <p>
* <b>Internal API:</b> This method is internal and should not be used by external code.
* It is exposed for internal use by caching ({@link redis.clients.jedis.csc.CacheKey#getRedisKeys()})
* and cluster operations.
* <p>
* <b>Supported types:</b> Keys are stored as either {@link String} or {@code byte[]} depending on
* how they were added via {@link #key(Object)} or {@link #addHashSlotKey(String)}/{@link #addHashSlotKey(byte[])}.
* Only {@link String} and {@code byte[]} are guaranteed to be supported by downstream consumers.
* <p>
* <b>Type safety:</b> Consumers must handle both {@link String} and {@code byte[]} types.
* Passing other types may cause {@link IllegalArgumentException} when used with caching
* (see {@link redis.clients.jedis.csc.AbstractCache#makeKeyForRedisKeysToCacheKeys(Object)})
* or cluster operations.
* <p>
* The returned list is unmodifiable to prevent external modification of the internal key tracking.
*
* @return unmodifiable list of keys ({@link String} or {@code byte[]})
*/
@Internal
public List<Object> getKeys() {
return keys;
return Collections.unmodifiableList(keys);
}

@Internal
public Set<Integer> getKeyHashSlots() {
// Return cached slots if available (cache is invalidated when keys are added)
if (cachedHashSlots != null) {
return cachedHashSlots;
}

// Compute hash slots and cache the result
Set<Integer> slots = new HashSet<>();
for (Object key : keys) {
if (key instanceof byte[]) {
slots.add(JedisClusterCRC16.getSlot((byte[]) key));
} else {
slots.add(JedisClusterCRC16.getSlot((String) key));
}
}
Comment thread
cursor[bot] marked this conversation as resolved.
// Cache as unmodifiable set to prevent external modification
cachedHashSlots = Collections.unmodifiableSet(slots);
return cachedHashSlots;
}

/**
* @return true if this command has no keys, false otherwise
*/
public boolean isKeyless() {
return keys.isEmpty();
}

public boolean isBlocking() {
Expand Down
Loading
Loading