pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] ivankelly commented on a change in pull request #2103: Issue 1433: Expose batch flushAsync() and flush() methods in Producer
Date Mon, 09 Jul 2018 10:50:37 GMT
ivankelly commented on a change in pull request #2103: Issue 1433: Expose batch flushAsync()
and flush() methods in Producer
URL: https://github.com/apache/incubator-pulsar/pull/2103#discussion_r200952616
 
 

 ##########
 File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 ##########
 @@ -2552,4 +2553,74 @@ public void testConsumerSubscriptionInitialize() throws Exception
{
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testFlushBatchEnabled() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/test-flush-enabled")
+                .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic("persistent://my-property/my-ns/test-flush-enabled")
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .batchingMaxMessages(10000);
+
+        @Cleanup
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.sendAsync(message.getBytes());
+        }
+        producer.flush();
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
+    public void testFlushBatchDisabled() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/test-flush-disabled")
 
 Review comment:
   this is longer than 120. I know there's no checkstyle enforced, but we should still try
to make stuff readable in github
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message