Support acknowledging a list of message IDs#1301
Conversation
|
This feature is good to me, I have some questions:
|
The 1st questionWhat's confusing is the tracking message ID itself. The existing func TestMyAck(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := fmt.Sprintf("test-my-ack-%v", time.Now().Nanosecond())
createConsumer := func() Consumer {
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
SubscriptionInitialPosition: SubscriptionPositionEarliest,
Type: Shared,
AckWithResponse: true,
})
assert.Nil(t, err)
return consumer
}
consumer := createConsumer()
sendMessages(t, client, topic, 0, 2, true) // send 0 and 1 in the same batch
msgs := receiveMessages(t, consumer, 2)
for i := 0; i < 2; i++ {
fmt.Println("Received message: ", string(msgs[i].Payload()), msgs[i].ID())
}
if err := consumer.AckID(msgs[0].ID()); err != nil {
fmt.Println("Ack message 0 failed: ", err.Error())
} else {
fmt.Println("Ack message 0 success")
}
consumer.Close()
consumer = createConsumer()
msgs = receiveMessages(t, consumer, 1)
fmt.Println("Received message: ", string(msgs[0].Payload()), msgs[0].ID())
}Outputs: From the perspective from user side:
Actually this API implements for msgID, err := range consumer.AckIDList([]MessageID{msgs[0].ID()}) {
fmt.Println("Failed to acknowledge ", msgID, err.Error())
}The outputs will be: P.S. we should fix the Users should add the failed message ID to the next message ID list passed to IMO, we should make batch index ACK enabled by default for both client side and server side. The current default behavior is really confusing. The 2nd question
|
|
@nodece I added |
Your explanation is clear, I agree with you:
Default to batch index ACK is disabled. When disabled, the users must ack all batch messages by
You only ack the first message, the client doesn't send the ack request to the broker, so you still receive the first message after restart the consumer. I remember the Java client has the same behavior, if I am wrong, please let me know.
You can refactor the You can also disscus this issue on the dev mailing list. |
There is no way to let the client know whether the ack request was sent. That's the point. In addition, users should never care if the ack was sent. They only care if the ack succeeded.
Yeah, but this behavior is wrong. I don't want to blame the author but the error handling is really not taken care of. Happy path is always easy to write.
Oh I see. Let me think a better way to reuse the code.
I plan to write a PIP to explain these things, including why to enable batch index ACK by default, the mess of acknowledge APIs' semantics. |
|
@nodece I reused the tracking message ID's tracker and used the semantics that ACK on an incomplete message ID in the batch does not return an error. Please check the latest tests. As for reusing the |
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x38 pc=0x1033af634]
goroutine 28 [running]:
testing.tRunner.func1.2({0x103773580, 0x103e9bde0})
/usr/local/go/src/testing/testing.go:1631 +0x1c4
testing.tRunner.func1()
/usr/local/go/src/testing/testing.go:1634 +0x33c
panic({0x103773580?, 0x103e9bde0?})
/usr/local/go/src/runtime/panic.go:770 +0x124
github.com/apache/pulsar-client-go/pulsar.(*consumer).checkMsgIDPartition(0x140001126c8, {0x0?, 0x0?})
/Users/xuyunze/github.com/bewaremypower/pulsar-client-go/pulsar/consumer_impl.go:757 +0x24
d4687ce to
14cbf16
Compare
|
@RobertIndie @nodece @crossoverJie @shibd Thanks for your reviews. The For the simple consumer case that only 1 topic is subscribed, since only 1 ACK request will be sent, these message IDs either all fail or all succeed. The only exceptional case is that invalid message IDs are passed. However, in this case, there is no way to handle these message IDs because acknowledging them will always fail. Hence I just return a trivial However, if the consumer subscribes multiple topics, there might be multiple See the API documents and the tests for details. PTAL again. |
Motivation
For
SharedandKey_Sharedsubscriptions, if some messages failed to acknowledge, these messages would never be dispatched to the consumer until restarted. If the number of unacknowledged messages reached the threshold, the broker would never dispatch messages anymore. However, the consumer does not have a chance to check which messages failed to acknowledged.Even if this case was not hit, if the consumer restarted after consuming many messages, the old unacknowledged messages would be delivered again, which is very confusing and might affect the business logic.
Therefore, we can only enable
AckWithResponseto know which messages failed to acknowledge. Unfortunately, currently the Go SDK only supports acknowledging single messages. It harms the performance significantly.To solve this solution, this PR adds an API to acknowledge a list of messages.
Users can save the failed message IDs and add them again in the next
AckIDListcall.Modifications
AckIDListAPI and reuse the logic ofack_grouping_tracker.goto convert user provided message IDs to the message IDs in the ACK requestsinternalAckListand wait for the response error via a error channelTestAckIDListto verify the case that a message ID list has message IDs of non-batched messages, whole batched messages or partial batched messages because the behaviors are different if the batch index ACK is enabledTestMultiTopicAckIDListto verify the multi-topics case, including regex subscription.Does this pull request potentially affect one of the following parts:
If
yeswas chosen, please highlight the changesDocumentation