Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
054f0df
fix: enhance zero queue consumer reconnection handling and message pe…
crossoverJie Dec 1, 2025
17f9561
Update pulsar/consumer_zero_queue_test.go
crossoverJie Dec 1, 2025
675a35f
fix ci lint
crossoverJie Dec 1, 2025
f3b583e
Merge remote-tracking branch 'origin/fix-zero-queue-once-permit' into…
crossoverJie Dec 1, 2025
f45c1b9
fix: ensure proper handling of connection state during zero queue con…
crossoverJie Dec 2, 2025
ccf86ab
fix: initialize connection in consumer partition tests
crossoverJie Dec 2, 2025
7ad6b21
fix: increase sleep duration in zero queue consumer tests
crossoverJie Dec 2, 2025
9df780f
fix: increase sleep duration in zero queue consumer tests
crossoverJie Dec 2, 2025
e6e9c04
fix: simplify zero queue reconnected policy by removing unnecessary p…
crossoverJie Dec 2, 2025
a8b1676
fix: improve zero queue consumer test by using containerized broker a…
crossoverJie Dec 2, 2025
4075a1f
fix: remove unnecessary sleep after unloading topic in zero queue con…
crossoverJie Dec 3, 2025
66e29a1
Update pulsar/consumer_zero_queue_test.go
crossoverJie Dec 3, 2025
a09fb7f
Update pulsar/consumer_zero_queue_test.go
crossoverJie Dec 3, 2025
70a0319
fix: remove trailing whitespace in zero queue consumer test
crossoverJie Dec 3, 2025
33e585d
fix: add structured logging to zero queue consumer test
crossoverJie Dec 3, 2025
bb8449d
fix: update zero queue consumer test to use context-aware stats retri…
crossoverJie Dec 4, 2025
0509e8d
fix: track reconnect count in zero queue consumer
crossoverJie Dec 4, 2025
465a038
fix: simplify TestReconnectedBrokerSendPermits by removing unused con…
crossoverJie Dec 4, 2025
955bc5b
fix: increase timeout duration in zero queue consumer tests
crossoverJie Dec 4, 2025
ac7fab2
fix: update TestReconnectedBrokerSendPermits to use dynamic container…
crossoverJie Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ type partitionConsumerOpts struct {
expireTimeOfIncompleteChunk time.Duration
autoAckIncompleteChunk bool
// in failover mode, this callback will be called when consumer change
consumerEventListener ConsumerEventListener
enableBatchIndexAck bool
ackGroupingOptions *AckGroupingOptions
enableZeroQueueConsumer bool
consumerEventListener ConsumerEventListener
enableBatchIndexAck bool
ackGroupingOptions *AckGroupingOptions
enableZeroQueueConsumer bool
zeroQueueReconnectedPolicy func(*partitionConsumer)
}

type ConsumerEventListener interface {
Expand Down Expand Up @@ -170,6 +171,7 @@ type partitionConsumer struct {
currentQueueSize uAtomic.Int32
scaleReceiverQueueHint uAtomic.Bool
incomingMessages uAtomic.Int32
reconnectCount uAtomic.Int32

eventsCh chan interface{}
connectedCh chan struct{}
Expand Down Expand Up @@ -1393,6 +1395,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
orderingKey: string(smm.OrderingKey),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
conn: pc._getConn(),
}
} else {
msg = &message{
Expand All @@ -1413,6 +1416,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
orderingKey: string(msgMeta.GetOrderingKey()),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
conn: pc._getConn(),
}
}

Expand Down Expand Up @@ -1541,6 +1545,7 @@ func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext {
func (pc *partitionConsumer) ConnectionClosed(closeConsumer *pb.CommandCloseConsumer) {
// Trigger reconnection in the consumer goroutine
pc.log.Debug("connection closed and send to connectClosedCh")
pc.reconnectCount.Inc()
var assignedBrokerURL string
if closeConsumer != nil {
assignedBrokerURL = pc.client.selectServiceURL(
Expand Down Expand Up @@ -1925,9 +1930,8 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
bo.Reset()
if pc.options.enableZeroQueueConsumer {
pc.log.Info("zeroQueueConsumer reconnect, reset availablePermits")
pc.availablePermits.inc()
if pc.options.enableZeroQueueConsumer && pc.options.zeroQueueReconnectedPolicy != nil {
pc.options.zeroQueueReconnectedPolicy(pc)
}
return struct{}{}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
pc._setConn(dummyConnection{})
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
Expand Down Expand Up @@ -76,6 +77,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
pc._setConn(dummyConnection{})
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
Expand Down Expand Up @@ -112,6 +114,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
pc._setConn(dummyConnection{})
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
Expand Down
24 changes: 21 additions & 3 deletions pulsar/consumer_zero_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

uAtomic "go.uber.org/atomic"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/pkg/errors"
Expand All @@ -36,6 +38,7 @@ type zeroQueueConsumer struct {
pc *partitionConsumer
consumerName string
disableForceTopicCreation bool
waitingOnReceive uAtomic.Bool

messageCh chan ConsumerMessage

Expand Down Expand Up @@ -71,11 +74,17 @@ func newZeroConsumer(client *client, options ConsumerOptions, topic string,
return nil, err
}
opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options)
conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics)
opts.zeroQueueReconnectedPolicy = func(pc *partitionConsumer) {
if zc.waitingOnReceive.Load() {
pc.log.Info("zeroQueueConsumer reconnect, reset availablePermits")
pc.availablePermits.inc()
}
}
pc, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics)
if err != nil {
return nil, err
}
zc.pc = conn
zc.pc = pc

return zc, nil
}
Expand Down Expand Up @@ -119,17 +128,26 @@ func (z *zeroQueueConsumer) Receive(ctx context.Context) (Message, error) {
}
z.Lock()
defer z.Unlock()
z.waitingOnReceive.Store(true)
z.pc.availablePermits.inc()
for {
select {
case <-z.closeCh:
z.waitingOnReceive.Store(false)
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-z.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
}
return cm.Message, nil
message, ok := cm.Message.(*message)
if ok && message.getConn().ID() == z.pc._getConn().ID() {
z.waitingOnReceive.Store(false)
return cm.Message, nil
} else {
z.log.WithField("messageID", cm.Message.ID()).Warn("message from old connection discarded after reconnection")
}
Comment thread
crossoverJie marked this conversation as resolved.
case <-ctx.Done():
z.waitingOnReceive.Store(false)
return nil, ctx.Err()
}
Comment thread
crossoverJie marked this conversation as resolved.
}
Expand Down
142 changes: 142 additions & 0 deletions pulsar/consumer_zero_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,148 @@ func TestReconnectConsumer(t *testing.T) {
defer c.Terminate(ctx)
}

func TestReconnectedBrokerSendPermits(t *testing.T) {
req := testcontainers.ContainerRequest{
Name: "pulsar-test",
Image: getPulsarTestImage(),
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
WaitingFor: wait.ForExposedPort(),
HostConfigModifier: func(config *container.HostConfig) {
config.PortBindings = map[nat.Port][]nat.PortBinding{
"6650/tcp": {{HostIP: "0.0.0.0", HostPort: "6659"}},
"8080/tcp": {{HostIP: "0.0.0.0", HostPort: "8089"}},
}
},
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
}
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
Reuse: true,
})
require.NoError(t, err, "Failed to start the pulsar container")
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
require.NoError(t, err, "Failed to get the pulsar endpoint")

sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
client, err := NewClient(ClientOptions{
URL: endpoint,
Logger: plog.NewLoggerWithSlog(sLogger),
})
assert.Nil(t, err)
adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", "http")
assert.Nil(t, err)
admin, err := pulsaradmin.NewClient(&config.Config{
WebServiceURL: adminEndpoint,
})
assert.Nil(t, err)

topic := newTopicName()
var consumer Consumer
require.Eventually(t, func() bool {
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
EnableZeroQueueConsumer: true,
Type: Shared, // using Shared subscription type to support unack subscription stats
})
return err == nil
}, 30*time.Second, 1*time.Second)
ctx := context.Background()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
Comment thread
crossoverJie marked this conversation as resolved.

// send 10 messages
for i := 0; i < 10; i++ {
msg, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
})
assert.Nil(t, err)
log.Printf("send message: %s", msg.String())
}

log.Println("unloading topic")
topicName, err := utils.GetTopicName(topic)
assert.Nil(t, err)
err = admin.Topics().Unload(*topicName)
assert.Nil(t, err)
log.Println("unloaded topic")
zc, ok := consumer.(*zeroQueueConsumer)
assert.True(t, ok)
// wait for reconnect
require.EventuallyWithT(t, func(c *assert.CollectT) {
reconnectCount := zc.pc.reconnectCount.Load()
require.Equal(c, reconnectCount, int32(1))
}, 30*time.Second, 1*time.Second)

// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
assert.Nil(t, err)
}

expectMsg := fmt.Sprintf("hello-%d", i)
expectProperties := map[string]string{
"key-1": "pulsar-1",
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
err = consumer.Ack(msg)
assert.Nil(t, err)
log.Printf("receive message: %s", msg.ID().String())
}
// send one more message and we do not manually receive it
_, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", 10)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
})
Comment thread
crossoverJie marked this conversation as resolved.
assert.Nil(t, err)
// wait for broker send messages to consumer and topic stats update finish
Comment thread
RobertIndie marked this conversation as resolved.
option := utils.GetStatsOptions{
GetPreciseBacklog: true,
}
require.EventuallyWithT(t, func(c *assert.CollectT) {
topicStats, err := admin.Topics().GetStatsWithOptionWithContext(ctx, *topicName, option)
require.Nil(c, err)
for _, subscriptionStats := range topicStats.Subscriptions {
require.Equal(c, subscriptionStats.MsgBacklog, int64(1))
require.Equal(c, subscriptionStats.Consumers[0].UnAckedMessages, 0)
}
}, 30*time.Second, 1*time.Second)

// ack
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
err = consumer.Ack(msg)
assert.Nil(t, err)

// check topic stats
require.EventuallyWithT(t, func(c *assert.CollectT) {
topicStats, err := admin.Topics().GetStatsWithOptionWithContext(ctx, *topicName, option)
require.Nil(c, err)
for _, subscriptionStats := range topicStats.Subscriptions {
require.Equal(c, subscriptionStats.MsgBacklog, int64(0))
require.Equal(c, subscriptionStats.Consumers[0].UnAckedMessages, 0)
}
}, 30*time.Second, 1*time.Second)

}

func TestUnloadTopicBeforeConsume(t *testing.T) {

sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
Expand Down
7 changes: 7 additions & 0 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"

"google.golang.org/protobuf/proto"

pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
Expand Down Expand Up @@ -313,6 +315,7 @@ type message struct {
encryptionContext *EncryptionContext
index *uint64
brokerPublishTime *time.Time
conn internal.Connection
}

func (msg *message) Topic() string {
Expand Down Expand Up @@ -394,6 +397,10 @@ func (msg *message) size() int {
return len(msg.payLoad)
}

func (msg *message) getConn() internal.Connection {
return msg.conn
}

func newAckTracker(size uint) *ackTracker {
batchIDs := bitset.New(size)
for i := uint(0); i < size; i++ {
Expand Down
Loading