Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ type ConsumerOptions struct {
// Default value is `1000` messages and should be good for most use cases.
ReceiverQueueSize int

// EnableZeroQueueConsumer, if enabled, the ReceiverQueueSize will be 0.
// Notice: only non-partitioned topic is supported.
// Default is false.
EnableZeroQueueConsumer bool

// EnableAutoScaledReceiverQueueSize, if enabled, the consumer receive queue will be auto-scaled
// by the consumer actual throughput. The ReceiverQueueSize will be the maximum size which consumer
// receive queue can be scaled.
Expand Down
114 changes: 69 additions & 45 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
Expand Down Expand Up @@ -81,6 +84,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
options.ReceiverQueueSize = defaultReceiverQueueSize
}

if options.EnableZeroQueueConsumer {
options.ReceiverQueueSize = 0
}

if options.Interceptors == nil {
options.Interceptors = defaultConsumerInterceptors
}
Expand Down Expand Up @@ -236,7 +243,24 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}

func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) {
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (Consumer, error) {
partitions, err := client.TopicPartitions(topic)
if err != nil {
return nil, err
}

if len(partitions) > 1 && options.EnableZeroQueueConsumer {
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}

if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}

if len(partitions) == 1 && options.EnableZeroQueueConsumer {
Comment thread
crossoverJie marked this conversation as resolved.
return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
}

consumer := &consumer{
topic: topic,
Expand All @@ -253,7 +277,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
metrics: client.metrics.GetLeveledMetrics(topic),
}

err := consumer.internalTopicSubscribeToPartitions()
err = consumer.internalTopicSubscribeToPartitions()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -343,10 +367,6 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
consumer *partitionConsumer
}

receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties
subProperties := c.options.SubscriptionProperties

startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions

Expand All @@ -364,45 +384,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {

go func(idx int, pt string) {
defer wg.Done()

var nackRedeliveryDelay time.Duration
if c.options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
nackRedeliveryDelay = c.options.NackRedeliveryDelay
}
opts := &partitionConsumerOpts{
topic: pt,
consumerName: c.consumerName,
subscription: c.options.SubscriptionName,
subscriptionType: c.options.Type,
subscriptionInitPos: c.options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: c.options.NackBackoffPolicy,
metadata: metadata,
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: c.options.startMessageID,
startMessageIDInclusive: c.options.StartMessageIDInclusive,
subscriptionMode: c.options.SubscriptionMode,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
maxReconnectToBroker: c.options.MaxReconnectToBroker,
backoffPolicy: c.options.BackoffPolicy,
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
ackWithResponse: c.options.AckWithResponse,
maxPendingChunkedMessage: c.options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: c.options.AutoAckIncompleteChunk,
consumerEventListener: c.options.EventListener,
enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: c.options.AckGroupingOptions,
autoReceiverQueueSize: c.options.EnableAutoScaledReceiverQueueSize,
}
opts := newPartitionConsumerOpts(pt, c.consumerName, idx, c.options)
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
err: err,
Expand Down Expand Up @@ -444,6 +426,48 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
return nil
}

func newPartitionConsumerOpts(topic, consumerName string, idx int, options ConsumerOptions) *partitionConsumerOpts {

var nackRedeliveryDelay time.Duration
if options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
nackRedeliveryDelay = options.NackRedeliveryDelay
}
return &partitionConsumerOpts{
topic: topic,
consumerName: consumerName,
subscription: options.SubscriptionName,
subscriptionType: options.Type,
subscriptionInitPos: options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: options.ReceiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: options.NackBackoffPolicy,
metadata: options.Properties,
subProperties: options.SubscriptionProperties,
replicateSubscriptionState: options.ReplicateSubscriptionState,
startMessageID: options.startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
subscriptionMode: options.SubscriptionMode,
readCompacted: options.ReadCompacted,
interceptors: options.Interceptors,
maxReconnectToBroker: options.MaxReconnectToBroker,
backoffPolicy: options.BackoffPolicy,
keySharedPolicy: options.KeySharedPolicy,
schema: options.Schema,
decryption: options.Decryption,
ackWithResponse: options.AckWithResponse,
maxPendingChunkedMessage: options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: options.AutoAckIncompleteChunk,
consumerEventListener: options.EventListener,
enableBatchIndexAck: options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: options.AckGroupingOptions,
autoReceiverQueueSize: options.EnableAutoScaledReceiverQueueSize,
}
}

func (c *consumer) Subscription() string {
return c.options.SubscriptionName
}
Expand Down
Loading