This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
commit 068db598adb529d1ae441084c280f0e776d2cbbc
Author: Rajan Dhabalia <rdhabalia@apache.org>
AuthorDate: Mon Aug 13 14:19:46 2018 -0700
Fix: function with multi-topic not acking on effectively-once (#2347)
### Motivation
`MultiTopicsConsumerImpl` doesn't support `acknowledgeCumulativeAsync` and therefore,
function with multi-topic and `EFFECTIVELY_ONCE` processing is not acking message and failing
`EFFECTIVELY_ONCE` behavior.
### Modifications
Function should ack message for a specific topic consumer if `inputTopicConsumer` is multi-topic
consumer.
### Result
Function should able to ack messages for multi-topic consumer when processing-guarantee
is `EFFECTIVELY_ONCE`
---
.../api/PartitionedProducerConsumerTest.java | 9 ++------
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 10 +++++----
.../client/impl/MultiTopicsConsumerImpl.java | 17 +++++++++------
.../pulsar/functions/source/PulsarSource.java | 24 ++++++++++++----------
4 files changed, 32 insertions(+), 28 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index a599532..ae7757d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -290,19 +290,14 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase
{
}
try {
- producer = pulsarClient.newProducer().topic(topicName.toString())
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ producer = pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition).create();
consumer = pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
producer.send("message1".getBytes());
producer.send("message2".getBytes());
/* Message<byte[]> msg1 = */ consumer.receive();
Message<byte[]> msg2 = consumer.receive();
consumer.acknowledgeCumulative(msg2);
- Assert.fail("should fail since ack cumulative is not supported for partitioned
topic");
- } catch (PulsarClientException e) {
- Assert.assertTrue(e instanceof PulsarClientException.NotSupportedException);
} finally {
producer.close();
consumer.unsubscribe();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 5398bc9..2001981 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -356,11 +357,11 @@ public class PulsarSinkE2ETest {
retryStrategically((test) -> {
try {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
- return subStats.unackedMessages == 0;
+ return subStats.unackedMessages == 0 && subStats.msgThroughputOut
== totalMsgs;
} catch (PulsarAdminException e) {
return false;
}
- }, 5, 500);
+ }, 5, 200);
FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
functionRuntimeManager.updateRates();
@@ -400,11 +401,12 @@ public class PulsarSinkE2ETest {
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
functionDetailsBuilder.setParallelism(1);
functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+ functionDetailsBuilder.setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE);
// set source spec
// source spec classname should be empty so that the default pulsar source will be
used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
- sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+ sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
sourceSpecBuilder.setTypeClassName(typeArg.getName());
sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
sourceSpecBuilder.setSubscriptionName(subscriptionName);
@@ -485,7 +487,7 @@ public class PulsarSinkE2ETest {
// set source spec
// source spec classname should be empty so that the default pulsar source will be
used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
- sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+ sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, DefaultSerDe.class.getName());
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fc91eed..4a0c449 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -362,22 +362,27 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String,Long> properties) {
checkArgument(messageId instanceof TopicMessageIdImpl);
- TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId;
+ TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
if (getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
}
if (ackType == AckType.Cumulative) {
- return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException(
- "Cumulative acknowledge not supported for topics consumer"));
+ Consumer individualConsumer = consumers.get(topicMessageId.getTopicName());
+ if (individualConsumer != null) {
+ MessageId innerId = topicMessageId.getInnerMessageId();
+ return individualConsumer.acknowledgeCumulativeAsync(innerId);
+ } else {
+ return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
+ }
} else {
- ConsumerImpl<T> consumer = consumers.get(messageId1.getTopicName());
+ ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicName());
- MessageId innerId = messageId1.getInnerMessageId();
+ MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doAcknowledge(innerId, ackType, properties)
.thenRun(() ->
- unAckedMessageTracker.remove(messageId1));
+ unAckedMessageTracker.remove(topicMessageId));
}
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index f70100d..dd2f05d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -18,12 +18,13 @@
*/
package org.apache.pulsar.functions.source;
-import com.google.common.annotations.VisibleForTesting;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
-
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
@@ -36,14 +37,14 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
@Slf4j
public class PulsarSource<T> implements Source<T> {
@@ -135,7 +136,8 @@ public class PulsarSource<T> implements Source<T> {
.message(message)
.topicName(topicName)
.ackFunction(() -> {
- if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE)
{
+ if (pulsarSourceConfig
+ .getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE)
{
inputConsumer.acknowledgeCumulativeAsync(message);
} else {
inputConsumer.acknowledgeAsync(message);
|