Skip to content

fix: enhance zero queue consumer reconnection handling and message permit management#1443

Merged
crossoverJie merged 20 commits into
apache:masterfrom
crossoverJie:fix-zero-queue-once-permit
Dec 5, 2025
Merged

fix: enhance zero queue consumer reconnection handling and message permit management#1443
crossoverJie merged 20 commits into
apache:masterfrom
crossoverJie:fix-zero-queue-once-permit

Conversation

@crossoverJie

Copy link
Copy Markdown
Member

Motivation

#1434 (comment)

When the broker restarts or a topic is unloaded, the zero consumer sends an extra permit, causing the number of UnAckedMessages to increase.

Modifications

https://github.com/apache/pulsar/blob/cc5e479d63103f81e3af833e8b06227d1a6563e1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L107-L152

Following the Java client implementation, add a waitingOnReceive variable to determine whether a permit needs to be sent.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request enhances the zero queue consumer reconnection handling and message permit management to prevent extra permits from being sent during broker restarts or topic unloads. The implementation follows the Java client pattern by tracking whether the consumer is waiting on a receive operation and filtering messages from old connections.

Key Changes:

  • Added waitingOnReceive atomic flag to track receive state
  • Implemented connection-based message filtering to discard messages from old connections after reconnection
  • Modified reconnection policy to conditionally increment permits based on receive state

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.

File Description
pulsar/impl_message.go Added conn field to message struct and getConn() method to track which connection a message came from
pulsar/consumer_zero_queue.go Implemented waitingOnReceive flag and connection ID comparison logic to filter old messages and manage permits during reconnection
pulsar/consumer_partition.go Updated message creation to capture connection reference and modified reconnection callback to use new policy function
pulsar/consumer_zero_queue_test.go Added comprehensive test validating permit management after topic unload and reconnection

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread pulsar/consumer_zero_queue.go
Comment thread pulsar/consumer_zero_queue_test.go
Comment thread pulsar/consumer_zero_queue_test.go Outdated
Comment thread pulsar/consumer_zero_queue.go
Comment thread pulsar/consumer_zero_queue_test.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Comment thread pulsar/consumer_zero_queue.go Outdated
@RobertIndie RobertIndie added this to the v0.18.0 milestone Dec 1, 2025
Comment thread pulsar/consumer_zero_queue_test.go Outdated
Comment thread pulsar/consumer_zero_queue_test.go Outdated
Comment thread pulsar/consumer_zero_queue_test.go Outdated
@crossoverJie crossoverJie requested a review from nodece December 3, 2025 03:29

@RobertIndie RobertIndie left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seesm the test doesn't cover the issue. I couldn't reproduce the original issue by reverting other changes from this PR.

Comment thread pulsar/consumer_zero_queue_test.go
Comment thread pulsar/consumer_zero_queue_test.go
Comment thread pulsar/consumer_zero_queue_test.go Outdated
Comment thread pulsar/consumer_zero_queue_test.go Outdated
Comment thread pulsar/consumer_zero_queue_test.go Outdated
Comment thread pulsar/consumer_zero_queue_test.go
Comment thread pulsar/consumer_zero_queue_test.go
@geniusjoe geniusjoe mentioned this pull request Dec 4, 2025
@crossoverJie crossoverJie merged commit 41bc6f4 into apache:master Dec 5, 2025
7 checks passed
@crossoverJie crossoverJie deleted the fix-zero-queue-once-permit branch December 5, 2025 07:05
RobertIndie pushed a commit that referenced this pull request Dec 8, 2025
…rmit management (#1443)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Zixuan Liu <nodeces@gmail.com>
(cherry picked from commit 41bc6f4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants