Skip to content

[fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator#25910

Merged
lhotari merged 6 commits into
apache:masterfrom
oneby-wang:testReacquireLeadershipAfterSessionLost_flaky_test
Jun 2, 2026
Merged

[fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator#25910
lhotari merged 6 commits into
apache:masterfrom
oneby-wang:testReacquireLeadershipAfterSessionLost_flaky_test

Conversation

@oneby-wang

Copy link
Copy Markdown
Contributor

Motivation

ZKSessionTest.testReacquireLeadershipAfterSessionLost can observe unstable metadata session events after a ZooKeeper session expires and PulsarZooKeeperClient creates a replacement client.

Failure test case1:

java.lang.AssertionError: expected [SessionReestablished] but found [ConnectionLost]
	at org.testng.Assert.fail(Assert.java:111)
	at org.testng.Assert.failNotEquals(Assert.java:1590)
	at org.testng.Assert.assertEqualsImpl(Assert.java:150)
	at org.testng.Assert.assertEquals(Assert.java:132)
	at org.testng.Assert.assertEquals(Assert.java:644)
	at org.apache.pulsar.metadata.ZKSessionTest.testReacquireLeadershipAfterSessionLost(ZKSessionTest.java:230)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:141)
	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Failure test case2:

java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
	at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:781)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:253)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:253)
	at org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:524)
	at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:702)
	at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:541)
Caused by: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:528)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:252)
	... 3 more
Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:133)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:53)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:518)
	... 4 more
Cause 1: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
	at app//org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:528)
	at app//org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:252)
	at app//org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:524)
	at app//org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:702)
	at app//org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:541)
Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
	at app//org.apache.zookeeper.KeeperException.create(KeeperException.java:133)
	at app//org.apache.zookeeper.KeeperException.create(KeeperException.java:53)
	at app//org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:518)
	... 4 more

The race happens during the handoff from the expired ZooKeeper instance to the new one:

  • ZooKeeper can deliver SyncConnected while the new client is still being constructed.
  • ZooKeeperWatcherBase forwards that session event to child watchers.
  • Those child watchers can run before PulsarZooKeeperClient publishes the new ZooKeeper handle.
  • During that window, follow-up operations can still be routed to the old expired handle, and an old async session probe can later overwrite the state of the newly established session.

This can produce extra or incomplete session transitions around ConnectionLost, SessionLost, Reconnected, and SessionReestablished.

Modifications

This change keeps the reconnect flow local to PulsarZooKeeperClient and ZKSessionWatcher.

  • PulsarZooKeeperClient now creates replacement ZooKeeper clients with a forwarding watcher instead of passing watcherManager directly.
  • The forwarding watcher waits until the new ZooKeeper handle has been published before forwarding events to watcherManager.
  • The new handle is published before releasing the forwarding watcher, and waitForConnection() runs after that release because it depends on the forwarded SyncConnected event.
  • ZKSessionWatcher records the session id used for its async exists("/") probe and only applies the probe result if the current session id still matches.
  • The session-id check and state transition are guarded by the same synchronized section so a stale probe cannot race with a new-session event and overwrite the new session state.

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@oneby-wang oneby-wang changed the title Test reacquire leadership after session lost flaky test [fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator() Jun 1, 2026
@oneby-wang oneby-wang changed the title [fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator() [fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator Jun 1, 2026
@oneby-wang

oneby-wang commented Jun 1, 2026

Copy link
Copy Markdown
Contributor Author

@lhotari
I noticed that after receiving a ZooKeeper Expired event, PulsarZooKeeperClient creates a new ZooKeeper instance while continuing to use the same watcherManager.

With the current direct forwarding approach, events from the new ZooKeeper instance are delayed until the new handle is published. However, if the old expired client still has any queued or late events delivered to the same watcherManager(I'm not sure if this will happen.), those stale events could still affect the session state and potentially make the state transitions inconsistent.

Do you think we should add a lightweight generation fencing mechanism here, so that only events from the currently active ZooKeeper instance are forwarded/processed, and stale events from previous instances are ignored?

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, great work @oneby-wang

@lhotari lhotari merged commit 5627c01 into apache:master Jun 2, 2026
43 checks passed
@lhotari lhotari added this to the 5.0.0-M1 milestone Jun 2, 2026
lhotari pushed a commit that referenced this pull request Jun 3, 2026
…ooKeeperClient.clientCreator (#25910)

(cherry picked from commit 5627c01)
lhotari pushed a commit that referenced this pull request Jun 3, 2026
…ooKeeperClient.clientCreator (#25910)

(cherry picked from commit 5627c01)
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 9, 2026
…ooKeeperClient.clientCreator (apache#25910)

(cherry picked from commit 5627c01)
(cherry picked from commit 9ebfc3b)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants