Skip to content

Commit eefb475

Browse files
oneby-wanglhotari
authored andcommitted
[fix][meta] Fix PulsarZooKeeperClient async addWatch callback retry behavior (apache#25913)
(cherry picked from commit be9f97a)
1 parent 2e289e6 commit eefb475

2 files changed

Lines changed: 63 additions & 5 deletions

File tree

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,7 +1163,7 @@ public String toString() {
11631163
}
11641164

11651165
@Override
1166-
public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx) {
1166+
public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object context) {
11671167
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setStats) {
11681168

11691169
final VoidCallback vCb = new VoidCallback() {
@@ -1174,7 +1174,7 @@ public void processResult(int rc, String path, Object ctx) {
11741174
if (allowRetry(worker, rc)) {
11751175
backOffAndRetry(that, worker.nextRetryWaitTime());
11761176
} else {
1177-
vCb.processResult(rc, basePath, ctx);
1177+
cb.processResult(rc, path, context);
11781178
}
11791179
}
11801180

@@ -1184,15 +1184,15 @@ public void processResult(int rc, String path, Object ctx) {
11841184
void zkRun() {
11851185
ZooKeeper zkHandle = zk.get();
11861186
if (null == zkHandle) {
1187-
PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode, cb, ctx);
1187+
PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode, vCb, worker);
11881188
} else {
1189-
zkHandle.addWatch(basePath, watcher, mode, cb, ctx);
1189+
zkHandle.addWatch(basePath, watcher, mode, vCb, worker);
11901190
}
11911191
}
11921192

11931193
@Override
11941194
public String toString() {
1195-
return String.format("setData (%s, mode = %s)", basePath, mode.name());
1195+
return String.format("addWatch (%s, mode = %s)", basePath, mode.name());
11961196
}
11971197
};
11981198
// execute it immediately

pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919
package org.apache.pulsar.metadata;
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.eq;
24+
import static org.mockito.Mockito.doAnswer;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.times;
27+
import static org.mockito.Mockito.verify;
2228
import static org.testng.Assert.assertEquals;
2329
import static org.testng.Assert.assertFalse;
2430
import static org.testng.Assert.assertNotNull;
@@ -40,6 +46,7 @@
4046
import java.util.concurrent.BlockingQueue;
4147
import java.util.concurrent.CompletableFuture;
4248
import java.util.concurrent.CompletionException;
49+
import java.util.concurrent.CountDownLatch;
4350
import java.util.concurrent.LinkedBlockingDeque;
4451
import java.util.concurrent.TimeUnit;
4552
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,6 +55,7 @@
4855
import lombok.Cleanup;
4956
import lombok.SneakyThrows;
5057
import lombok.extern.slf4j.Slf4j;
58+
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
5159
import org.apache.pulsar.common.util.FutureUtil;
5260
import org.apache.pulsar.metadata.api.GetResult;
5361
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -62,6 +70,9 @@
6270
import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient;
6371
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
6472
import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStore;
73+
import org.apache.zookeeper.AddWatchMode;
74+
import org.apache.zookeeper.AsyncCallback.VoidCallback;
75+
import org.apache.zookeeper.KeeperException;
6576
import org.apache.zookeeper.WatchedEvent;
6677
import org.apache.zookeeper.Watcher;
6778
import org.apache.zookeeper.ZooKeeper;
@@ -538,6 +549,53 @@ public void testZkLoadConfigFromFile() throws Exception {
538549
assertFalse(zooKeeper.getClientConfig().isSaslClientEnabled());
539550
}
540551

552+
@Test
553+
@SuppressWarnings("unchecked")
554+
public void testAsyncAddWatchRetriesWithWrapperCallback() throws Exception {
555+
String path = newKey();
556+
@Cleanup
557+
PulsarZooKeeperClient zkClient = PulsarZooKeeperClient.newBuilder()
558+
.connectString(zks.getConnectionString())
559+
.sessionTimeoutMs(3000)
560+
.operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(0, 0, 3))
561+
.build();
562+
563+
ZooKeeper mockZk = mock(ZooKeeper.class);
564+
AtomicInteger attempts = new AtomicInteger();
565+
doAnswer(invocation -> {
566+
// The wrapper callback should consume this recoverable failure and retry the addWatch operation.
567+
int rc = attempts.incrementAndGet() == 1
568+
? KeeperException.Code.CONNECTIONLOSS.intValue()
569+
: KeeperException.Code.OK.intValue();
570+
String callbackPath = invocation.getArgument(0);
571+
VoidCallback callback = invocation.getArgument(3);
572+
Object callbackContext = invocation.getArgument(4);
573+
callback.processResult(rc, callbackPath, callbackContext);
574+
return null;
575+
}).when(mockZk).addWatch(eq(path), any(Watcher.class), eq(AddWatchMode.PERSISTENT_RECURSIVE),
576+
any(VoidCallback.class), any());
577+
578+
// Force the Pulsar wrapper to delegate the async addWatch call to our controlled ZooKeeper instance.
579+
var zooKeeperRef = (AtomicReference<ZooKeeper>) WhiteboxImpl.getInternalState(zkClient, "zk");
580+
zooKeeperRef.set(mockZk);
581+
582+
CountDownLatch callbackCalled = new CountDownLatch(1);
583+
AtomicInteger callbackRc = new AtomicInteger(Integer.MIN_VALUE);
584+
zkClient.addWatch(path, event -> {
585+
}, AddWatchMode.PERSISTENT_RECURSIVE, (rc, callbackPath, ctx) -> {
586+
callbackRc.set(rc);
587+
callbackCalled.countDown();
588+
}, null);
589+
590+
assertTrue(callbackCalled.await(5, TimeUnit.SECONDS));
591+
592+
// The caller should only see the final successful result after the retry, not the first CONNECTIONLOSS.
593+
assertEquals(callbackRc.get(), KeeperException.Code.OK.intValue());
594+
assertEquals(attempts.get(), 2);
595+
verify(mockZk, times(2)).addWatch(eq(path), any(Watcher.class), eq(AddWatchMode.PERSISTENT_RECURSIVE),
596+
any(VoidCallback.class), any());
597+
}
598+
541599
@Test
542600
public void testOxiaLoadConfigFromFile() throws Exception {
543601
final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", "");

0 commit comments

Comments
 (0)