pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [pulsar] jai1 commented on a change in pull request #4031: Feature - support seek() on Reader
Date Fri, 12 Apr 2019 21:58:42 GMT
jai1 commented on a change in pull request #4031: Feature - support seek() on Reader
URL: https://github.com/apache/pulsar/pull/4031#discussion_r275073206
 
 

 ##########
 File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
 ##########
 @@ -510,4 +513,114 @@ public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception
{
         reader.close();
         producer.close();
     }
+
+    @Test
+    public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic";
+        final int numOfMessage = 10;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName).create();
+
+        for (int i = 0; i < numOfMessage; i++) {
+            producer.send(String.format("msg num %d", i).getBytes());
+        }
+
+        Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+                .startMessageId(MessageId.earliest).create();
+
+        assertTrue(reader.hasMessageAvailable());
+
+        // Read all messages the first time
+        for (int i = 0; i < numOfMessage; i++) {
+            Message<byte[]> message = reader.readNext();
+            Assert.assertEquals(message.getData(), String.format("msg num %d", i).getBytes());
+        }
+
+        assertFalse(reader.hasMessageAvailable());
+
+        // Perform cursor reset by time
+        reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
+
+        // FIXME: This sleep is necessary because future completed by seekAsync() might complete
with a state different
+        //        than Ready, this is from the fact that consumer reset makes a disconnect
from broker in order to be
+        //        able to do the reset().
+        Thread.sleep(1000);
 
 Review comment:
   If you are waiting for a reconnect to complete then I would suggest wait for upto 10 seconds
with 20 retries (Each of 500 ms)
   
   ```
   for (int i = 0; i<20 && !reader.isConnected(); i++) {
       Thread.sleep(500);
   }
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message