pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch master updated: Fix flaky test PersistentTopicE2ETest.testMessageRedelivery (#2320)
Date Wed, 08 Aug 2018 08:32:23 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d81bf6d  Fix flaky test PersistentTopicE2ETest.testMessageRedelivery (#2320)
d81bf6d is described below

commit d81bf6d9e61edc96e419f078760ae9371e62f566
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Wed Aug 8 01:32:21 2018 -0700

    Fix flaky test PersistentTopicE2ETest.testMessageRedelivery (#2320)
    
    ### Motivation
    
    Test failures are caused by the delayed acks that making the broker redelivering few more
messages than what remained unacked. Disabling delayed acked to have the behavior expected
by the test.
    
    Example of failures:
    
    ```
    java.lang.AssertionError: expected [my-message-0] but found [my-message-2]
    	at org.testng.Assert.fail(Assert.java:96)
    	at org.testng.Assert.failNotEquals(Assert.java:776)
    	at org.testng.Assert.assertEqualsImpl(Assert.java:137)
    	at org.testng.Assert.assertEquals(Assert.java:118)
    	at org.testng.Assert.assertEquals(Assert.java:453)
    	at org.testng.Assert.assertEquals(Assert.java:463)
    	at org.apache.pulsar.broker.service.PersistentTopicE2ETest.testMessageRedelivery(PersistentTopicE2ETest.java:1252)
    ```
    
    https://builds.apache.org/job/pulsar_precommit_java8/2976/testReport/junit/org.apache.pulsar.broker.service/PersistentTopicE2ETest/testMessageRedelivery_2/
---
 .../broker/service/PersistentTopicE2ETest.java     | 29 ++++++++++++----------
 1 file changed, 16 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index a5beefa..ee9fb74 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1211,30 +1211,33 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
      * 1. produce messages 2. consume messages and ack all except 1 msg 3. Verification:
should replay only 1 unacked
      * message
      */
-    @Test()
+    @Test
     public void testMessageRedelivery() throws Exception {
         final String topicName = "persistent://prop/ns-abc/topic2";
         final String subName = "sub2";
 
-        Message<byte[]> msg;
+        Message<String> msg;
         int totalMessages = 10;
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-                .subscriptionType(SubscriptionType.Shared).subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer()
-            .topic(topicName)
-            .enableBatching(false)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
 
         // (1) Produce messages
         for (int i = 0; i < totalMessages; i++) {
-            String message = "my-message-" + i;
-            producer.send(message.getBytes());
+            producer.send("my-message-" + i);
         }
 
         // (2) Consume and ack messages except first message
-        Message<byte[]> unAckedMsg = null;
+        Message<String> unAckedMsg = null;
         for (int i = 0; i < totalMessages; i++) {
             msg = consumer.receive();
             if (i == 0) {
@@ -1249,7 +1252,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         // Verify: msg [L:0] must be redelivered
         try {
             msg = consumer.receive(1, TimeUnit.SECONDS);
-            assertEquals(new String(msg.getData()), new String(unAckedMsg.getData()));
+            assertEquals(msg.getValue(), unAckedMsg.getValue());
         } catch (Exception e) {
             fail("msg should be redelivered ", e);
         }


Mime
View raw message