pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Fix instability in Pulsar Function window integration test (#5337)
Date Tue, 08 Oct 2019 15:48:53 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 094ebf7  Fix instability in Pulsar Function window integration test (#5337)
094ebf7 is described below

commit 094ebf7af8d0bb27bd9df3bcea2dba4e9ba204fc
Author: Boyang Jerry Peng <jerry.boyang.peng@gmail.com>
AuthorDate: Tue Oct 8 08:48:49 2019 -0700

    Fix instability in Pulsar Function window integration test (#5337)
    
    * fix instability with tumbling window test
    
    * cleaning up
    
    * cleaning up
---
 .../integration/functions/PulsarFunctionsTest.java | 47 ++++++++++++++--------
 .../tests/integration/suites/PulsarTestSuite.java  | 24 +++++++++--
 2 files changed, 50 insertions(+), 21 deletions(-)

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 1d35c1b..e92371d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -1015,7 +1015,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
                 .build();
 
         @Cleanup
-        Reader reader = client.newReader().startMessageId(MessageId.earliest)
+        Reader<byte[]> reader = client.newReader().startMessageId(MessageId.earliest)
                 .topic(outputTopicName)
                 .create();
 
@@ -1030,12 +1030,18 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         }
 
         int i = 0;
-        while (reader.hasMessageAvailable()) {
-            if (i >= expectedResults.length) {
+        while (true) {
+            if (i > expectedResults.length) {
                 Assertions.fail("More results than expected");
             }
-            String result = new String(reader.readNext().getData()).split(":")[0];
-            log.info("i: {} result: {}", i, result);
+
+            Message<byte[]> msg = reader.readNext(30, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            String msgStr = new String(msg.getData());
+            log.info("i: {} RECV: {}", i, msgStr);
+            String result = msgStr.split(":")[0];
             assertThat(result).contains(expectedResults[i]);
             i++;
         }
@@ -1706,18 +1712,25 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
     }
 
     private static void getFunctionInfoNotFound(String functionName) throws Exception {
-        try {
-            pulsarCluster.getAnyWorker().execCmd(
-                    PulsarCluster.ADMIN_SCRIPT,
-                    "functions",
-                    "get",
-                    "--tenant", "public",
-                    "--namespace", "default",
-                    "--name", functionName);
-            fail("Command should have exited with non-zero");
-        } catch (ContainerExecException e) {
-            assertTrue(e.getResult().getStderr().contains("Reason: Function " + functionName
+ " doesn't exist"));
-        }
+        retryStrategically(aVoid -> {
+            try {
+                pulsarCluster.getAnyWorker().execCmd(
+                        PulsarCluster.ADMIN_SCRIPT,
+                        "functions",
+                        "get",
+                        "--tenant", "public",
+                        "--namespace", "default",
+                        "--name", functionName);
+            } catch (ContainerExecException e) {
+                if (e.getResult().getStderr().contains("Reason: Function " + functionName
+ " doesn't exist")) {
+                    return true;
+                }
+
+            } catch (Exception e) {
+
+            }
+            return false;
+        }, 5, 100, true);
     }
 
     private static void checkSubscriptionsCleanup(String topic) throws Exception {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index 7fe4e46..4b8dde7 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -44,13 +44,29 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements
ITest {
         return "pulsar-test-suite";
     }
 
-    public static void retryStrategically(Predicate<Void> predicate, int retryCount,
long intSleepTimeInMillis)
+    public static void retryStrategically(Predicate<Void> predicate, int retryCount,
long intSleepTimeInMillis) throws Exception {
+        retryStrategically(predicate, retryCount, intSleepTimeInMillis, false);
+    }
+
+
+    public static void retryStrategically(Predicate<Void> predicate, int retryCount,
long intSleepTimeInMillis, boolean throwException)
             throws Exception {
+
         for (int i = 0; i < retryCount; i++) {
-            if (predicate.test(null) || i == (retryCount - 1)) {
-                break;
+            if (throwException) {
+                if (i == (retryCount - 1)) {
+                    throw new RuntimeException("Action was not successful after " + retryCount
+ " retries");
+                }
+                if (predicate.test(null)) {
+                    break;
+                }
+            } else {
+                if (predicate.test(null) || i == (retryCount - 1)) {
+                    break;
+                }
             }
-            Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
+
+           Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
         }
     }
 }


Mime
View raw message