pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch master updated: [integration tests] Return exit code as part of execution result of running a command in testcontainers (#2233)
Date Wed, 25 Jul 2018 23:06:45 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 596d681   [integration tests] Return exit code as part of execution result of running
a command in testcontainers (#2233)
596d681 is described below

commit 596d68170172937f92bcc0876eb36a42a99c0793
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Wed Jul 25 16:06:43 2018 -0700

     [integration tests] Return exit code as part of execution result of running a command
in testcontainers (#2233)
    
     ### Motivation
    
    The default `ExecResult` in testcontainers doesn't include the exit code of running a
command in a test container.
    
     ### Changes
    
    - Add a class `ContainerExecResult` to represent the result of executing a command in
a container, including exitCode, stdout and stderr
    - Improve the existing util method in `DockerUtils` to return `ContainerExecResult`
    - Improve all the integration tests on validating exit codes before validating outputs
---
 .../pulsar/tests/integration/cli/CLITest.java      | 39 ++++++++++------
 .../integration/compaction/TestCompaction.java     | 51 +++++++++++----------
 .../integration/containers/ChaosContainer.java     | 19 +++-----
 .../integration/docker/ContainerExecResult.java    | 33 ++++++++++++++
 .../integration/functions/PulsarFunctionsTest.java | 10 +++--
 .../runtime/PulsarFunctionsRuntimeTest.java        | 17 ++++---
 .../tests/integration/io/PulsarIOSinkTest.java     | 25 +++++++----
 .../tests/integration/io/PulsarIOSourceTest.java   | 24 ++++++----
 .../pulsar/tests/integration/smoke/SmokeTest.java  | 17 ++++---
 .../integration/topologies/PulsarCluster.java      | 12 ++---
 .../tests/integration/utils/DockerUtils.java       | 52 ++++++++++++++++------
 11 files changed, 196 insertions(+), 103 deletions(-)

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 7b4d0d0..e69b0dd 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -18,14 +18,15 @@
  */
 package org.apache.pulsar.tests.integration.cli;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
@@ -37,7 +38,8 @@ public class CLITest extends PulsarClusterTestBase {
     public void testDeprecatedCommands() throws Exception {
         String tenantName = "test-deprecated-commands";
 
-        ExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("--help");
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker("--help");
+        assertEquals(0, result.getExitCode());
         assertFalse(result.getStdout().isEmpty());
         assertFalse(result.getStdout().contains("Usage: properties "));
         result = pulsarCluster.runAdminCommandOnAnyBroker(
@@ -63,7 +65,7 @@ public class CLITest extends PulsarClusterTestBase {
 
         int i = 0;
         for (BrokerContainer container : pulsarCluster.getBrokers()) {
-            ExecResult result = container.execCmd(
+            ContainerExecResult result = container.execCmd(
                 PulsarCluster.ADMIN_SCRIPT,
                 "persistent",
                 "create-subscription",
@@ -71,6 +73,7 @@ public class CLITest extends PulsarClusterTestBase {
                 "--subscription",
                 "" + subscriptionPrefix + i
             );
+            assertEquals(0, result.getExitCode());
             assertTrue(result.getStdout().isEmpty());
             assertTrue(result.getStderr().isEmpty());
             i++;
@@ -81,7 +84,7 @@ public class CLITest extends PulsarClusterTestBase {
     public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Exception
{
         String topicName = "persistent://public/default/test-topic-termination";
         BrokerContainer container = pulsarCluster.getAnyBroker();
-        ExecResult result = container.execCmd(
+        ContainerExecResult result = container.execCmd(
             PulsarCluster.CLIENT_SCRIPT,
             "produce",
             "-m",
@@ -90,7 +93,8 @@ public class CLITest extends PulsarClusterTestBase {
             "1",
             topicName);
 
-        Assert.assertTrue(result.getStdout().contains("1 messages successfully produced"));
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().contains("1 messages successfully produced"));
 
         // terminate the topic
         result = container.execCmd(
@@ -98,7 +102,8 @@ public class CLITest extends PulsarClusterTestBase {
             "persistent",
             "terminate",
             topicName);
-        Assert.assertTrue(result.getStdout().contains("Topic succesfully terminated at"));
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().contains("Topic succesfully terminated at"));
 
         // try to produce should fail
         result = pulsarCluster.getAnyBroker().execCmd(
@@ -109,6 +114,7 @@ public class CLITest extends PulsarClusterTestBase {
             "-n",
             "1",
             topicName);
+        assertNotEquals(0, result.getExitCode());
         assertTrue(result.getStdout().contains("Topic was already terminated"));
     }
 
@@ -117,7 +123,7 @@ public class CLITest extends PulsarClusterTestBase {
         BrokerContainer container = pulsarCluster.getAnyBroker();
         String topicName = "persistent://public/default/test-schema-cli";
 
-        ExecResult result = container.execCmd(
+        ContainerExecResult result = container.execCmd(
             PulsarCluster.CLIENT_SCRIPT,
             "produce",
             "-m",
@@ -125,7 +131,8 @@ public class CLITest extends PulsarClusterTestBase {
             "-n",
             "1",
             topicName);
-        Assert.assertTrue(result.getStdout().contains("1 messages successfully produced"));
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().contains("1 messages successfully produced"));
 
         result = container.execCmd(
             PulsarCluster.ADMIN_SCRIPT,
@@ -135,8 +142,9 @@ public class CLITest extends PulsarClusterTestBase {
             "-f",
             "/pulsar/conf/schema_example.conf"
         );
-        Assert.assertTrue(result.getStdout().isEmpty());
-        Assert.assertTrue(result.getStderr().isEmpty());
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().isEmpty());
+        assertTrue(result.getStderr().isEmpty());
 
         // get schema
         result = container.execCmd(
@@ -144,7 +152,8 @@ public class CLITest extends PulsarClusterTestBase {
             "schemas",
             "get",
             topicName);
-        Assert.assertTrue(result.getStdout().contains("\"type\" : \"STRING\""));
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().contains("\"type\" : \"STRING\""));
 
         // delete the schema
         result = container.execCmd(
@@ -152,8 +161,9 @@ public class CLITest extends PulsarClusterTestBase {
             "schemas",
             "delete",
             topicName);
-        Assert.assertTrue(result.getStdout().isEmpty());
-        Assert.assertTrue(result.getStderr().isEmpty());
+        assertEquals(0, result.getExitCode());
+        assertTrue(result.getStdout().isEmpty());
+        assertTrue(result.getStderr().isEmpty());
 
         // get schema again
         result = container.execCmd(
@@ -162,6 +172,7 @@ public class CLITest extends PulsarClusterTestBase {
             "get",
             "persistent://public/default/test-schema-cli"
         );
+        assertNotEquals(0, result.getExitCode());
         assertTrue(result.getStderr().contains("Reason: HTTP 404 Not Found"));
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
index 6825e73..05d6235 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
@@ -23,12 +23,11 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static java.util.stream.Collectors.joining;
+import static org.testng.Assert.assertEquals;
 
 public class TestCompaction extends PulsarClusterTestBase {
 
@@ -54,12 +53,12 @@ public class TestCompaction extends PulsarClusterTestBase {
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) {
                 Message<byte[]> m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content0".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content0".getBytes());
 
                 m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content1".getBytes());
             }
 
             pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic", "-t", topic);
@@ -67,8 +66,8 @@ public class TestCompaction extends PulsarClusterTestBase {
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) {
                 Message<byte[]> m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content1".getBytes());
             }
         }
     }
@@ -98,12 +97,12 @@ public class TestCompaction extends PulsarClusterTestBase {
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) {
                 Message<byte[]> m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content0".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content0".getBytes());
 
                 m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content1".getBytes());
             }
             pulsarCluster.runAdminCommandOnAnyBroker("persistent",
                     "compact", topic);
@@ -114,8 +113,8 @@ public class TestCompaction extends PulsarClusterTestBase {
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) {
                 Message<byte[]> m = consumer.receive();
-                Assert.assertEquals(m.getKey(), "key0");
-                Assert.assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getKey(), "key0");
+                assertEquals(m.getData(), "content1".getBytes());
             }
         }
     }
@@ -126,7 +125,7 @@ public class TestCompaction extends PulsarClusterTestBase {
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                  .readCompacted(true).subscriptionName(sub).subscribe()) {
                 Message<byte[]> m = consumer.receive();
-                Assert.assertEquals(m.getKey(), expectedKey);
+                assertEquals(m.getKey(), expectedKey);
                 if (new String(m.getData()).equals(expectedValue)) {
                     break;
                 }
@@ -136,8 +135,8 @@ public class TestCompaction extends PulsarClusterTestBase {
         try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                 .readCompacted(true).subscriptionName(sub).subscribe()) {
             Message<byte[]> m = consumer.receive();
-            Assert.assertEquals(m.getKey(), expectedKey);
-            Assert.assertEquals(new String(m.getData()), expectedValue);
+            assertEquals(m.getKey(), expectedKey);
+            assertEquals(new String(m.getData()), expectedValue);
         }
     }
 
@@ -172,20 +171,24 @@ public class TestCompaction extends PulsarClusterTestBase {
         }
     }
 
-    private Container.ExecResult createTenantName(final String tenantName,
-                                                  final String allowedClusterName,
-                                                  final String adminRoleName) throws Exception
{
-        return pulsarCluster.runAdminCommandOnAnyBroker(
+    private ContainerExecResult createTenantName(final String tenantName,
+                                                 final String allowedClusterName,
+                                                 final String adminRoleName) throws Exception
{
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
             "tenants", "create", "--allowed-clusters", allowedClusterName,
             "--admin-roles", adminRoleName, tenantName);
+        assertEquals(0, result.getExitCode());
+        return result;
     }
 
-    private Container.ExecResult createNamespace(final String Ns) throws Exception {
-        return pulsarCluster.runAdminCommandOnAnyBroker(
+    private ContainerExecResult createNamespace(final String Ns) throws Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
                 "namespaces",
                 "create",
                 "--clusters",
                 pulsarCluster.getClusterName(), Ns);
+        assertEquals(0, result.getExitCode());
+        return result;
     }
 
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 32e9583..239954d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.tests.integration.containers;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.github.dockerjava.api.DockerClient;
 import com.github.dockerjava.api.command.LogContainerCmd;
 import com.github.dockerjava.api.model.Frame;
 import com.github.dockerjava.core.command.LogContainerResultCallback;
@@ -28,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.utils.DockerUtils;
 import org.testcontainers.containers.GenericContainer;
 
@@ -101,18 +103,11 @@ public class ChaosContainer<SelfT extends ChaosContainer<SelfT>>
extends Generic
         return sb.toString();
     }
 
-    public ExecResult execCmd(String... cmd) throws Exception {
-        String cmdString = StringUtils.join(cmd, " ");
-
-        log.info("DOCKER.exec({}:{}): Executing ...", containerName.substring(1), cmdString);
-
-        ExecResult result = execInContainer(cmd);
-
-        log.info("Docker.exec({}:{}): Done", containerName.substring(1), cmdString);
-        log.info("Docker.exec({}:{}): Stdout -\n{}", containerName.substring(1), cmdString,
result.getStdout());
-        log.info("Docker.exec({}:{}): Stderr -\n{}", containerName.substring(1), cmdString,
result.getStderr());
-
-        return result;
+    public ContainerExecResult execCmd(String... commands) throws Exception {
+        DockerClient client = this.getDockerClient();
+        String dockerId = this.getContainerId();
+        return DockerUtils.runCommand(
+            client, dockerId, true, commands);
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
new file mode 100644
index 0000000..fe040ad
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.docker;
+
+import lombok.Data;
+
+/**
+ * Represents the result of executing a command.
+ */
+@Data(staticConstructor = "of")
+public class ContainerExecResult {
+
+    private final int exitCode;
+    private final String stdout;
+    private final String stderr;
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d83661c..a541c09 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.WorkerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testcontainers.containers.Container.ExecResult;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -50,7 +51,8 @@ public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         String[] commands = {
             "sh", "-c", actualCommand
         };
-        ExecResult output = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult output = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertEquals(0, output.getExitCode());
         assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
         return bkPkgPath;
     }
@@ -71,7 +73,8 @@ public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             "sh", "-c", actualCommand
         };
         WorkerContainer container = pulsarCluster.getAnyWorker();
-        ExecResult output = container.execCmd(commands);
+        ContainerExecResult output = container.execCmd(commands);
+        assertEquals(0, output.getExitCode());
         assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
         String[] diffCommand = {
             "diff",
@@ -79,6 +82,7 @@ public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             localPkgFile
         };
         output = container.execCmd(diffCommand);
+        assertEquals(0, output.getExitCode());
         assertTrue(output.getStdout().isEmpty());
         assertTrue(output.getStderr().isEmpty());
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
index 6477323..8b7bafc 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.tests.integration.functions.runtime;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
 import lombok.Cleanup;
@@ -28,6 +29,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
@@ -103,13 +105,14 @@ public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase
{
         String[] commands = {
             "sh", "-c", command
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             commands);
+        assertEquals(0, result.getExitCode());
         assertTrue(result.getStdout().contains("\"Created successfully\""));
     }
 
     private static void getFunctionInfoSuccess(String functionName) throws Exception {
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
             "get",
@@ -117,11 +120,12 @@ public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase
{
             "--namespace", "default",
             "--name", functionName
         );
+        assertEquals(0, result.getExitCode());
         assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\""));
     }
 
     private static void getFunctionInfoNotFound(String functionName) throws Exception {
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
             "get",
@@ -129,11 +133,12 @@ public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase
{
             "--namespace", "default",
             "--name", functionName
         );
+        assertNotEquals(0, result.getExitCode());
         assertTrue(result.getStderr().contains("Reason: Function " + functionName + " doesn't
exist"));
     }
 
     private static void getFunctionStatus(String functionName, int numMessages) throws Exception
{
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
             "getstatus",
@@ -141,6 +146,7 @@ public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase
{
             "--namespace", "default",
             "--name", functionName
         );
+        assertEquals(0, result.getExitCode());
         assertTrue(result.getStdout().contains("\"running\": true"));
         assertTrue(result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""));
         assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": \"" + numMessages
+ "\""));
@@ -172,7 +178,7 @@ public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase
{
     }
 
     private static void deleteFunction(String functionName) throws Exception {
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
             "delete",
@@ -180,6 +186,7 @@ public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase
{
             "--namespace", "default",
             "--name", functionName
         );
+        assertEquals(0, result.getExitCode());
         assertTrue(result.getStdout().contains("Deleted successfully"));
         assertTrue(result.getStderr().isEmpty());
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
index d0aab47..71cad05 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
 import com.google.common.base.Stopwatch;
@@ -32,11 +34,11 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
-import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
@@ -131,7 +133,8 @@ public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
             "--inputs", inputTopicName
         };
         log.info("Run command : {}", StringUtils.join(commands, ' '));
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("\"Created successfully\""),
             result.getStdout());
@@ -146,8 +149,9 @@ public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
             "--namespace", namespace,
             "--name", sinkName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get sink info : {}", result.getStdout());
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("\"builtin\": \"" + tester.sinkType + "\""),
             result.getStdout()
@@ -164,9 +168,9 @@ public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
             "--name", sinkName
         };
         while (true) {
-            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
             log.info("Get sink status : {}", result.getStdout());
-            if (result.getStdout().contains("\"running\": true")) {
+            if (0 == result.getExitCode() && result.getStdout().contains("\"running\":
true")) {
                 return;
             }
             log.info("Backoff 1 second until the function is running");
@@ -211,9 +215,10 @@ public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
         };
         Stopwatch stopwatch = Stopwatch.createStarted();
         while (true) {
-            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
             log.info("Get sink status : {}", result.getStdout());
-            if (result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""))
{
+            if (0 == result.getExitCode()
+                && result.getStdout().contains("\"numProcessed\": \"" + numMessages
+ "\"")) {
                 return;
             }
             log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff
to wait for another 1 second",
@@ -231,7 +236,8 @@ public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
             "--namespace", namespace,
             "--name", sinkName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("Deleted successfully"),
             result.getStdout()
@@ -251,7 +257,8 @@ public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
             "--namespace", namespace,
             "--name", sinkName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertNotEquals(0, result.getExitCode());
         assertTrue(result.getStderr().contains("Reason: Function " + sinkName + " doesn't
exist"));
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
index 78d8874..8468976 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.tests.integration.io;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
 import com.google.common.base.Stopwatch;
@@ -34,11 +35,11 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
-import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
@@ -143,7 +144,8 @@ public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
             "--destinationTopicName", outputTopicName
         };
         log.info("Run command : {}", StringUtils.join(commands, ' '));
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("\"Created successfully\""),
             result.getStdout());
@@ -158,8 +160,9 @@ public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
             "--namespace", namespace,
             "--name", sourceName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get source info : {}", result.getStdout());
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("\"builtin\": \"" + tester.sourceType + "\""),
             result.getStdout()
@@ -176,9 +179,9 @@ public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
             "--name", sourceName
         };
         while (true) {
-            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
             log.info("Get source status : {}", result.getStdout());
-            if (result.getStdout().contains("\"running\": true")) {
+            if (0 == result.getExitCode() && result.getStdout().contains("\"running\":
true")) {
                 return;
             }
             log.info("Backoff 1 second until the function is running");
@@ -209,9 +212,10 @@ public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
         };
         Stopwatch stopwatch = Stopwatch.createStarted();
         while (true) {
-            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
             log.info("Get source status : {}", result.getStdout());
-            if (result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""))
{
+            if (0 == result.getExitCode()
+                && result.getStdout().contains("\"numProcessed\": \"" + numMessages
+ "\"")) {
                 return;
             }
             log.info("{} ms has elapsed but the source hasn't process {} messages, backoff
to wait for another 1 second",
@@ -229,7 +233,8 @@ public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
             "--namespace", namespace,
             "--name", sourceName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertEquals(0, result.getExitCode());
         assertTrue(
             result.getStdout().contains("Delete source successfully"),
             result.getStdout()
@@ -249,7 +254,8 @@ public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
             "--namespace", namespace,
             "--name", sourceName
         };
-        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertNotEquals(0, result.getExitCode());
         assertTrue(result.getStderr().contains("Reason: Function " + sourceName + " doesn't
exist"));
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
index 85a8d2d..b22bd74 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.tests.integration.smoke;
 
+import static org.testng.Assert.assertEquals;
+
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -25,9 +27,8 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.util.concurrent.TimeUnit;
@@ -69,15 +70,17 @@ public class SmokeTest extends PulsarClusterTestBase {
         }
         for (int i = 0; i < 10; i++) {
             Message m = consumer.receive();
-            Assert.assertEquals("smoke-message" + i, new String(m.getData()));
+            assertEquals("smoke-message" + i, new String(m.getData()));
         }
     }
 
-    private Container.ExecResult createTenantName(String tenantName,
-                                                  String clusterName,
-                                                  String roleName) throws Exception {
-        return pulsarCluster.runAdminCommandOnAnyBroker(
+    private ContainerExecResult createTenantName(String tenantName,
+                                                 String clusterName,
+                                                 String roleName) throws Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
             "tenants", "create", tenantName, "--allowed-clusters", clusterName,
             "--admin-roles", roleName);
+        assertEquals(0, result.getExitCode());
+        return result;
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index bdd8123..2f1653a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -43,7 +43,7 @@ import org.apache.pulsar.tests.integration.containers.ProxyContainer;
 import org.apache.pulsar.tests.integration.containers.PulsarContainer;
 import org.apache.pulsar.tests.integration.containers.WorkerContainer;
 import org.apache.pulsar.tests.integration.containers.ZKContainer;
-import org.testcontainers.containers.Container.ExecResult;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 
@@ -315,15 +315,15 @@ public class PulsarCluster {
         return brokerContainers.values();
     }
 
-    public ExecResult runAdminCommandOnAnyBroker(String...commands) throws Exception {
+    public ContainerExecResult runAdminCommandOnAnyBroker(String...commands) throws Exception
{
         return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
     }
 
-    public ExecResult runPulsarBaseCommandOnAnyBroker(String...commands) throws Exception
{
+    public ContainerExecResult runPulsarBaseCommandOnAnyBroker(String...commands) throws
Exception {
         return runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT, commands);
     }
 
-    private ExecResult runCommandOnAnyBrokerWithScript(String scriptType, String...commands)
throws Exception {
+    private ContainerExecResult runCommandOnAnyBrokerWithScript(String scriptType, String...commands)
throws Exception {
         BrokerContainer container = getAnyBroker();
         String[] cmds = new String[commands.length + 1];
         cmds[0] = scriptType;
@@ -339,13 +339,13 @@ public class PulsarCluster {
         brokerContainers.values().forEach(BrokerContainer::start);
     }
 
-    public ExecResult createNamespace(String nsName) throws Exception {
+    public ContainerExecResult createNamespace(String nsName) throws Exception {
         return runAdminCommandOnAnyBroker(
             "namespaces", "create", "public/" + nsName,
             "--clusters", clusterName);
     }
 
-    public ExecResult enableDeduplication(String nsName, boolean enabled) throws Exception
{
+    public ContainerExecResult enableDeduplication(String nsName, boolean enabled) throws
Exception {
         return runAdminCommandOnAnyBroker(
             "namespaces", "set-deduplication", "public/" + nsName,
             enabled ? "--enable" : "--disable");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index 51fb8d8..8684201 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.tests.integration.utils;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import com.github.dockerjava.api.DockerClient;
 import com.github.dockerjava.api.async.ResultCallback;
 import com.github.dockerjava.api.command.InspectContainerResponse;
@@ -25,6 +27,7 @@ import com.github.dockerjava.api.command.InspectExecResponse;
 import com.github.dockerjava.api.model.Frame;
 import com.github.dockerjava.api.model.ContainerNetwork;
 
+import com.github.dockerjava.api.model.StreamType;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -34,16 +37,15 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.zip.GZIPOutputStream;
 
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -162,16 +164,26 @@ public class DockerUtils {
         throw new IllegalArgumentException("Container " + containerId + " has no networks");
     }
 
-    public static String getContainerHostname(DockerClient docker, String containerId) {
-        return runCommand(docker, containerId, "hostname").trim();
+    public static ContainerExecResult runCommand(DockerClient docker, String containerId,
String... cmd)
+            throws Exception {
+        return runCommand(docker, containerId, false, cmd);
     }
 
-    public static String runCommand(DockerClient docker, String containerId, String... cmd)
{
+    public static ContainerExecResult runCommand(DockerClient docker,
+                                                 String containerId,
+                                                 boolean ignoreError,
+                                                 String... cmd)
+            throws Exception {
         CompletableFuture<Boolean> future = new CompletableFuture<>();
-        String execid = docker.execCreateCmd(containerId).withCmd(cmd)
-            .withAttachStderr(true).withAttachStdout(true).exec().getId();
+        String execid = docker.execCreateCmd(containerId)
+            .withCmd(cmd)
+            .withAttachStderr(true)
+            .withAttachStdout(true)
+            .exec()
+            .getId();
         String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
-        StringBuffer output = new StringBuffer();
+        StringBuilder stdout = new StringBuilder();
+        StringBuilder stderr = new StringBuilder();
         docker.execStartCmd(execid).withDetach(false)
             .exec(new ResultCallback<Frame>() {
                 @Override
@@ -185,7 +197,11 @@ public class DockerUtils {
                 @Override
                 public void onNext(Frame object) {
                     LOG.info("DOCKER.exec({}:{}): {}", containerId, cmdString, object);
-                    output.append(new String(object.getPayload()));
+                    if (StreamType.STDOUT == object.getStreamType()) {
+                        stdout.append(new String(object.getPayload(), UTF_8));
+                    } else if (StreamType.STDERR == object.getStreamType()) {
+                        stderr.append(new String(object.getPayload(), UTF_8));
+                    }
                 }
 
                 @Override
@@ -213,14 +229,22 @@ public class DockerUtils {
         }
         int retCode = resp.getExitCode();
         if (retCode != 0) {
-            LOG.error("DOCKER.exec({}:{}): failed with {} : {}", containerId, cmdString,
retCode, output);
-            throw new RuntimeException(
-                    String.format("cmd(%s) failed on %s with exitcode %d",
-                                  cmdString, containerId, retCode));
+            if (!ignoreError) {
+                LOG.error("DOCKER.exec({}:{}): failed with {} :\nStdout:\n{}\n\nStderr:\n{}",
+                    containerId, cmdString, retCode, stdout.toString(), stderr.toString());
+                throw new Exception(String.format("cmd(%s) failed on %s with exitcode %d",
+                    cmdString, containerId, retCode));
+            } else {
+                LOG.error("DOCKER.exec({}:{}): failed with {}", containerId, cmdString, retCode);
+            }
         } else {
             LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
         }
-        return output.toString();
+        return ContainerExecResult.of(
+            retCode,
+            stdout.toString(),
+            stderr.toString()
+        );
     }
 
     public static Optional<String> getContainerCluster(DockerClient docker, String
containerId) {


Mime
View raw message