This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 93b6d20 Fixed initialization order of acknowledgmentsGroupingTracker in ConsumerImpl
(#2399)
93b6d20 is described below
commit 93b6d209e8c1bfdc3c84eb066dc46da0bed165c5
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Mon Aug 20 10:39:54 2018 -0700
Fixed initialization order of acknowledgmentsGroupingTracker in ConsumerImpl (#2399)
### Motivation
With delayed acks enabled (the default), there is a potential race condition that lead
to a NPE:
```
java.lang.NullPointerException
at org.apache.pulsar.client.impl.ConsumerImpl.getClientCnx(ConsumerImpl.java:1446)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.flush(PersistentAcknowledgmentsGroupingTracker.java:154)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
...
```
The reason is that the delayed ack commit task gets scheduled (eg: in 100ms) and might
be executed before the the main thread has finished initializing the `ConsumerImpl` instance.
### Modifications
Reordered the initialization in `ConsumerImpl` constructor to make sure `connectionHandler`
is already set when we create the `PersistentAcknowledgmentsGroupingTracker` instance.
---
.../org/apache/pulsar/client/impl/ConsumerImpl.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1c69c75..1a7b67b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -160,15 +160,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
- TopicName topicName = TopicName.get(topic);
- if (topicName.isPersistent()) {
- this.acknowledgmentsGroupingTracker =
- new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
- } else {
- this.acknowledgmentsGroupingTracker =
- NonPersistentAcknowledgmentGroupingTracker.of();
- }
-
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
} else {
@@ -203,6 +194,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS),
this);
+ TopicName topicName = TopicName.get(topic);
+ if (topicName.isPersistent()) {
+ this.acknowledgmentsGroupingTracker =
+ new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
+ } else {
+ this.acknowledgmentsGroupingTracker =
+ NonPersistentAcknowledgmentGroupingTracker.of();
+ }
+
grabCnx();
}
|