pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2301: Issue #1452: remove reachedEndOfTopic in addConsumer
Date Sun, 05 Aug 2018 23:49:44 GMT
sijie closed pull request #2301: Issue #1452: remove reachedEndOfTopic in addConsumer
URL: https://github.com/apache/incubator-pulsar/pull/2301
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 6092a8663a..dac9f055d7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -103,11 +103,6 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
             throw new SubscriptionFencedException("Subscription is fenced");
         }
 
-        if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog()
== 0) {
-            // Immediately notify the consumer that there are no more available messages
-            consumer.reachedEndOfTopic();
-        }
-
         if (dispatcher == null || !dispatcher.isConsumerConnected()) {
             switch (consumer.subType()) {
             case Exclusive:
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 79166a18f4..61bdad034a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2434,7 +2434,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMe
         Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
                 .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
                 .cryptoKeyReader(new EncKeyReader()).create();
-        
+
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic1")
                 .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
                 .subscribe();
@@ -2450,7 +2450,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMe
         consumer.close();
         log.info("-- Exiting {} test --", methodName);
     }
-    
+
     private String decryptMessage(TopicMessageImpl<byte[]> msg, String encryptionKeyName,
CryptoKeyReader reader)
             throws Exception {
         Optional<EncryptionContext> ctx = msg.getEncryptionCtx();
@@ -2624,4 +2624,42 @@ public void testFlushBatchDisabled() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    // Issue 1452: https://github.com/apache/incubator-pulsar/issues/1452
+    // reachedEndOfTopic should be called only once if a topic has been terminated before
subscription
+    @Test
+    public void testReachedEndOfTopic() throws Exception
+    {
+        String topicName = "persistent://my-property/my-ns/testReachedEndOfTopic";
+        Producer producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false).create();
+        producer.close();
+
+        admin.topics().terminateTopicAsync(topicName).get();
+
+        CountDownLatch latch = new CountDownLatch(2);
+        Consumer consumer = pulsarClient.newConsumer()
+            .topic(topicName)
+            .subscriptionName("my-subscriber-name")
+            .messageListener(new MessageListener()
+            {
+                @Override
+                public void reachedEndOfTopic(Consumer consumer)
+                {
+                    log.info("called reachedEndOfTopic  {}", methodName);
+                    latch.countDown();
+                }
+
+                @Override
+                public void received(Consumer consumer, Message message)
+                {
+                    // do nothing
+                }
+            })
+            .subscribe();
+
+        assertFalse(latch.await(1, TimeUnit.SECONDS));
+        assertEquals(latch.getCount(), 1);
+        consumer.close();
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message