pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch branch-2.1 updated: Fixed initialization order of acknowledgmentsGroupingTracker in ConsumerImpl (#2399)
Date Mon, 27 Aug 2018 18:23:54 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 93b6d20  Fixed initialization order of acknowledgmentsGroupingTracker in ConsumerImpl
(#2399)
93b6d20 is described below

commit 93b6d209e8c1bfdc3c84eb066dc46da0bed165c5
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Mon Aug 20 10:39:54 2018 -0700

    Fixed initialization order of acknowledgmentsGroupingTracker in ConsumerImpl (#2399)
    
    ### Motivation
    
    With delayed acks enabled (the default), there is a potential race condition that lead
to a NPE:
    
    ```
    java.lang.NullPointerException
        at org.apache.pulsar.client.impl.ConsumerImpl.getClientCnx(ConsumerImpl.java:1446)
        at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.flush(PersistentAcknowledgmentsGroupingTracker.java:154)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        ...
    ```
    
    The reason is that the delayed ack commit task gets scheduled (eg: in 100ms) and might
be executed before the the main thread has finished initializing the `ConsumerImpl` instance.
    
    ### Modifications
    
    Reordered the initialization in `ConsumerImpl` constructor to make sure `connectionHandler`
is already set when we create the `PersistentAcknowledgmentsGroupingTracker` instance.
---
 .../org/apache/pulsar/client/impl/ConsumerImpl.java    | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1c69c75..1a7b67b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -160,15 +160,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
 
-        TopicName topicName = TopicName.get(topic);
-        if (topicName.isPersistent()) {
-            this.acknowledgmentsGroupingTracker =
-                new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
-        } else {
-            this.acknowledgmentsGroupingTracker =
-                NonPersistentAcknowledgmentGroupingTracker.of();
-        }
-
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
         } else {
@@ -203,6 +194,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
             new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS),
             this);
 
+        TopicName topicName = TopicName.get(topic);
+        if (topicName.isPersistent()) {
+            this.acknowledgmentsGroupingTracker =
+                new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
+        } else {
+            this.acknowledgmentsGroupingTracker =
+                NonPersistentAcknowledgmentGroupingTracker.of();
+        }
+
         grabCnx();
     }
 


Mime
View raw message