fix: enhance zero queue consumer reconnection handling and message permit management#1443
Merged
crossoverJie merged 20 commits intoDec 5, 2025
Merged
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This pull request enhances the zero queue consumer reconnection handling and message permit management to prevent extra permits from being sent during broker restarts or topic unloads. The implementation follows the Java client pattern by tracking whether the consumer is waiting on a receive operation and filtering messages from old connections.
Key Changes:
- Added
waitingOnReceiveatomic flag to track receive state - Implemented connection-based message filtering to discard messages from old connections after reconnection
- Modified reconnection policy to conditionally increment permits based on receive state
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| pulsar/impl_message.go | Added conn field to message struct and getConn() method to track which connection a message came from |
| pulsar/consumer_zero_queue.go | Implemented waitingOnReceive flag and connection ID comparison logic to filter old messages and manage permits during reconnection |
| pulsar/consumer_partition.go | Updated message creation to capture connection reference and modified reconnection callback to use new policy function |
| pulsar/consumer_zero_queue_test.go | Added comprehensive test validating permit management after topic unload and reconnection |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
RobertIndie
reviewed
Dec 1, 2025
… fix-zero-queue-once-permit
…nd adjusting sleep durations
nodece
reviewed
Dec 3, 2025
nodece
reviewed
Dec 3, 2025
nodece
reviewed
Dec 3, 2025
Co-authored-by: Zixuan Liu <nodeces@gmail.com>
Co-authored-by: Zixuan Liu <nodeces@gmail.com>
nodece
approved these changes
Dec 3, 2025
RobertIndie
requested changes
Dec 3, 2025
RobertIndie
left a comment
Member
There was a problem hiding this comment.
It seesm the test doesn't cover the issue. I couldn't reproduce the original issue by reverting other changes from this PR.
geniusjoe
reviewed
Dec 3, 2025
… endpoints and reduce timeout duration
RobertIndie
approved these changes
Dec 4, 2025
1 task
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
#1434 (comment)
When the broker restarts or a topic is unloaded, the zero consumer sends an extra permit, causing the number of UnAckedMessages to increase.
Modifications
https://github.com/apache/pulsar/blob/cc5e479d63103f81e3af833e8b06227d1a6563e1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L107-L152
Following the Java client implementation, add a
waitingOnReceivevariable to determine whether a permit needs to be sent.Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yeswas chosen, please highlight the changesDocumentation