pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2055: Introduce testcontainer based cluster integration tests
Date Sun, 01 Jul 2018 00:22:04 GMT
sijie closed pull request #2055: Introduce testcontainer based cluster integration tests
URL: https://github.com/apache/incubator-pulsar/pull/2055
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 37ed8f992e..5acf55624c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,9 @@ flexible messaging model and an intuitive client API.</description>
     <testRealAWS>false</testRealAWS>
     <testRetryCount>1</testRetryCount>
 
+    <!-- apache commons -->
+    <commons-compress.version>1.15</commons-compress.version>
+
     <bookkeeper.version>4.7.1</bookkeeper.version>
     <zookeeper.version>3.5.4-beta</zookeeper.version>
     <netty.version>4.1.22.Final</netty.version>
@@ -155,7 +158,10 @@ flexible messaging model and an intuitive client API.</description>
     <avro.version>1.8.2</avro.version>
 
     <!-- test dependencies -->
+    <arquillian-cube.version>1.15.1</arquillian-cube.version>
+    <arquillian-junit.version>1.1.14.Final</arquillian-junit.version>
     <disruptor.version>3.4.0</disruptor.version>
+    <testcontainers.version>1.8.0</testcontainers.version>
 
     <!-- Plugin dependencies -->
     <protobuf-maven-plugin.version>0.5.0</protobuf-maven-plugin.version>
@@ -390,6 +396,12 @@ flexible messaging model and an intuitive client API.</description>
         <version>3.4</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-compress</artifactId>
+        <version>${commons-compress.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>commons-configuration</groupId>
         <artifactId>commons-configuration</artifactId>
@@ -783,6 +795,21 @@ flexible messaging model and an intuitive client API.</description>
         <artifactId>disruptor</artifactId>
         <version>${disruptor.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.testcontainers</groupId>
+        <artifactId>testcontainers</artifactId>
+        <version>${testcontainers.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.arquillian.cube</groupId>
+        <artifactId>arquillian-cube-docker</artifactId>
+        <version>${arquillian-cube.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.jboss.arquillian.junit</groupId>
+        <artifactId>arquillian-junit-standalone</artifactId>
+        <version>${arquillian-junit.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/tests/integration-tests-topologies/pom.xml b/tests/integration-tests-topologies/pom.xml
index 93fc1feb7c..4b84adf6f7 100644
--- a/tests/integration-tests-topologies/pom.xml
+++ b/tests/integration-tests-topologies/pom.xml
@@ -36,5 +36,21 @@
   <packaging>jar</packaging>
 
   <name>Apache Pulsar :: Tests :: Common topologies for Arquillian based integration
tests</name>
+  <dependencies>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar.tests</groupId>
+      <artifactId>integration-tests-utils</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
 
 </project>
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BKContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BKContainer.java
new file mode 100644
index 0000000000..1b87ad9974
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BKContainer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs bookkeeper.
+ */
+public class BKContainer extends PulsarContainer<BKContainer> {
+
+    public BKContainer(String clusterName, String hostName) {
+        super(
+            clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT, INVALID_PORT);
+    }
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BrokerContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BrokerContainer.java
new file mode 100644
index 0000000000..28668e2acb
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BrokerContainer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs bookkeeper.
+ */
+public class BrokerContainer extends PulsarContainer<BrokerContainer> {
+
+    public BrokerContainer(String clusterName, String hostName) {
+        super(
+            clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, INVALID_PORT);
+    }
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CSContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CSContainer.java
new file mode 100644
index 0000000000..0f228ea8c6
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CSContainer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs configuration store.
+ */
+public class CSContainer extends PulsarContainer<CSContainer> {
+
+    public static final String NAME = "configuration-store";
+
+    public CSContainer(String clusterName) {
+        super(
+            clusterName,
+            NAME,
+            NAME,
+            "bin/run-global-zk.sh",
+            CS_PORT,
+            INVALID_PORT);
+    }
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ChaosContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ChaosContainer.java
new file mode 100644
index 0000000000..cd27960980
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ChaosContainer.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.github.dockerjava.api.command.LogContainerCmd;
+import com.github.dockerjava.api.model.Frame;
+import com.github.dockerjava.core.command.LogContainerResultCallback;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.testcontainers.containers.GenericContainer;
+
+/**
+ * A base container provides chaos capability.
+ */
+@Slf4j
+public class ChaosContainer<SelfT extends ChaosContainer<SelfT>> extends GenericContainer<SelfT>
{
+
+    protected final String clusterName;
+
+    protected ChaosContainer(String clusterName, String image) {
+        super(image);
+        this.clusterName = clusterName;
+    }
+
+    public void tailContainerLog() {
+        CompletableFuture.runAsync(() -> {
+            while (null == containerId) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(100);
+                } catch (InterruptedException e) {
+                    return;
+                }
+            }
+
+            LogContainerCmd logContainerCmd = this.dockerClient.logContainerCmd(containerId);
+            logContainerCmd.withStdOut(true).withStdErr(true).withFollowStream(true);
+            logContainerCmd.exec(new LogContainerResultCallback() {
+                @Override
+                public void onNext(Frame item) {
+                    log.info(new String(item.getPayload(), UTF_8));
+                }
+            });
+        });
+    }
+
+    public String getContainerLog() {
+        StringBuilder sb = new StringBuilder();
+
+        LogContainerCmd logContainerCmd = this.dockerClient.logContainerCmd(containerId);
+        logContainerCmd.withStdOut(true).withStdErr(true);
+        try {
+            logContainerCmd.exec(new LogContainerResultCallback() {
+                @Override
+                public void onNext(Frame item) {
+                    sb.append(new String(item.getPayload(), UTF_8));
+                }
+            }).awaitCompletion();
+        } catch (InterruptedException e) {
+
+        }
+        return sb.toString();
+    }
+
+    public ExecResult execCmd(String... cmd) throws Exception {
+        String cmdString = StringUtils.join(cmd, " ");
+
+        log.info("DOCKER.exec({}:{}): Executing ...", containerId, cmdString);
+
+        ExecResult result = execInContainer(cmd);
+
+        log.info("Docker.exec({}:{}): Done", containerId, cmdString);
+        log.info("Docker.exec({}:{}): Stdout -\n{}", containerId, cmdString, result.getStdout());
+        log.info("Docker.exec({}:{}): Stderr -\n{}", containerId, cmdString, result.getStderr());
+
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ChaosContainer)) {
+            return false;
+        }
+
+        ChaosContainer another = (ChaosContainer) o;
+        return clusterName.equals(another.clusterName)
+            && super.equals(another);
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * super.hashCode() + Objects.hash(
+            clusterName);
+    }
+
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ProxyContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ProxyContainer.java
new file mode 100644
index 0000000000..329014196d
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ProxyContainer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs bookkeeper.
+ */
+public class ProxyContainer extends PulsarContainer<ProxyContainer> {
+
+    public ProxyContainer(String clusterName, String hostName) {
+        super(
+            clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT, BROKER_HTTP_PORT);
+    }
+
+    public String getPlainTextServiceUrl() {
+        return "pulsar://" + getContainerIpAddress() + ":" + getMappedPort(BROKER_PORT);
+    }
+
+    public String getHttpServiceUrl() {
+        return "http://" + getContainerIpAddress() + ":" + getMappedPort(BROKER_HTTP_PORT);
+    }
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
new file mode 100644
index 0000000000..eb0ae840d1
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import static java.time.temporal.ChronoUnit.SECONDS;
+
+import java.time.Duration;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+/**
+ * Abstract Test Container for Pulsar.
+ */
+@Slf4j
+public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> extends
ChaosContainer<SelfT> {
+
+    public static final int INVALID_PORT = -1;
+    public static final int ZK_PORT = 2181;
+    public static final int CS_PORT = 2184;
+    public static final int BOOKIE_PORT = 3181;
+    public static final int BROKER_PORT = 6650;
+    public static final int BROKER_HTTP_PORT = 8080;
+
+    private static final String IMAGE_NAME = "apachepulsar/pulsar-test-latest-version:latest";
+
+    private final String hostname;
+    private final String serviceName;
+    private final String serviceEntrypoint;
+    private final int servicePort;
+    private final int httpPort;
+
+    public PulsarContainer(String clusterName,
+                           String hostname,
+                           String serviceName,
+                           String serviceEntrypoint,
+                           int servicePort,
+                           int httpPort) {
+        super(clusterName, IMAGE_NAME);
+        this.hostname = hostname;
+        this.serviceName = serviceName;
+        this.serviceEntrypoint = serviceEntrypoint;
+        this.servicePort = servicePort;
+        this.httpPort = httpPort;
+    }
+
+    @Override
+    public String getContainerName() {
+        return clusterName + "-" + hostname;
+    }
+
+    @Override
+    protected void configure() {
+        if (httpPort > 0) {
+            addExposedPorts(
+                servicePort, httpPort
+            );
+        } else if (servicePort > 0) {
+            addExposedPort(servicePort);
+        }
+    }
+
+    @Override
+    public void start() {
+        if (httpPort > 0 || servicePort > 0) {
+            this.waitStrategy = new HostPortWaitStrategy()
+                .withStartupTimeout(Duration.of(60, SECONDS));
+        }
+        this.withCreateContainerCmdModifier(createContainerCmd -> {
+            createContainerCmd.withHostName(hostname);
+            createContainerCmd.withName(getContainerName());
+            createContainerCmd.withEntrypoint(serviceEntrypoint);
+        });
+
+        super.start();
+        log.info("Start pulsar service {} at container {}", serviceName, containerName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof PulsarContainer)) {
+            return false;
+        }
+
+        PulsarContainer another = (PulsarContainer) o;
+        return containerName.equals(another.containerName)
+            && super.equals(another);
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * super.hashCode() + Objects.hash(
+            containerName);
+    }
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ZKContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ZKContainer.java
new file mode 100644
index 0000000000..c6524f989e
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ZKContainer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs zookeeper.
+ */
+public class ZKContainer extends PulsarContainer<ZKContainer> {
+
+    public static final String NAME = "zookeeper";
+
+    public ZKContainer(String clusterName) {
+        super(
+            clusterName,
+            NAME,
+            NAME,
+            "bin/run-local-zk.sh",
+            ZK_PORT,
+            INVALID_PORT);
+    }
+
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/package-info.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/package-info.java
new file mode 100644
index 0000000000..2e0ad98ac7
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Test containers used for running integration tests.
+ */
+package org.apache.pulsar.tests.containers;
\ No newline at end of file
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
new file mode 100644
index 0000000000..a30509d26c
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.topologies;
+
+import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.containers.BKContainer;
+import org.apache.pulsar.tests.containers.BrokerContainer;
+import org.apache.pulsar.tests.containers.CSContainer;
+import org.apache.pulsar.tests.containers.ProxyContainer;
+import org.apache.pulsar.tests.containers.PulsarContainer;
+import org.apache.pulsar.tests.containers.ZKContainer;
+import org.testcontainers.containers.Network;
+
+/**
+ * Pulsar Cluster in containers.
+ */
+@Slf4j
+public class PulsarCluster {
+
+    /**
+     * Pulsar Cluster Spec.
+     *
+     * @param spec pulsar cluster spec.
+     * @return the built pulsar cluster
+     */
+    public static PulsarCluster forSpec(PulsarClusterSpec spec) {
+        return new PulsarCluster(spec);
+    }
+
+    private final PulsarClusterSpec spec;
+    @Getter
+    private final String clusterName;
+    private final Network network;
+    private final ZKContainer zkContainer;
+    private final CSContainer csContainer;
+    private final Map<String, BKContainer> bookieContainers;
+    private final Map<String, BrokerContainer> brokerContainers;
+    private final ProxyContainer proxyContainer;
+
+    private PulsarCluster(PulsarClusterSpec spec) {
+        this.spec = spec;
+        this.clusterName = spec.clusterName();
+        this.network = Network.newNetwork();
+        this.zkContainer = new ZKContainer(clusterName)
+            .withNetwork(network)
+            .withNetworkAliases(ZKContainer.NAME)
+            .withEnv("clusterName", clusterName)
+            .withEnv("zkServers", ZKContainer.NAME)
+            .withEnv("configurationStore", CSContainer.NAME + ":" + CS_PORT)
+            .withEnv("pulsarNode", "pulsar-broker-0");
+
+        this.csContainer = new CSContainer(clusterName)
+            .withNetwork(network)
+            .withNetworkAliases(CSContainer.NAME);
+        this.bookieContainers = Maps.newTreeMap();
+        this.brokerContainers = Maps.newTreeMap();
+        this.proxyContainer = new ProxyContainer(clusterName, "pulsar-proxy")
+            .withNetwork(network)
+            .withNetworkAliases("pulsar-proxy")
+            .withEnv("zookeeperServers", ZKContainer.NAME)
+            .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
+            .withEnv("clusterName", clusterName);
+    }
+
+    public String getPlainTextServiceUrl() {
+        return proxyContainer.getPlainTextServiceUrl();
+    }
+
+    public String getHttpServiceUrl() {
+        return proxyContainer.getHttpServiceUrl();
+    }
+
+    public void start() throws Exception {
+        // start the local zookeeper
+        zkContainer.start();
+        log.info("Successfully started local zookeeper container.");
+
+        // start the configuration store
+        csContainer.start();
+        log.info("Successfully started configuration store container.");
+
+        // init the cluster
+        zkContainer.execCmd(
+            "bin/init-cluster.sh");
+        log.info("Successfully initialized the cluster.");
+
+        // create bookies
+        bookieContainers.putAll(
+            runNumContainers("bookie", spec.numBookies(), (name) -> new BKContainer(clusterName,
name)
+                .withNetwork(network)
+                .withNetworkAliases(name)
+                .withEnv("zkServers", ZKContainer.NAME)
+                .withEnv("useHostNameAsBookieID", "true")
+                .withEnv("clusterName", clusterName)
+            )
+        );
+
+        // create brokers
+        brokerContainers.putAll(
+            runNumContainers("broker", spec.numBrokers(), (name) -> new BrokerContainer(clusterName,
name)
+                .withNetwork(network)
+                .withNetworkAliases(name)
+                .withEnv("zookeeperServers", ZKContainer.NAME)
+                .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
+                .withEnv("clusterName", clusterName)
+                .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
+            )
+        );
+
+        // create proxy
+        proxyContainer.start();
+        log.info("Successfully started pulsar proxy.");
+
+        log.info("Pulsar cluster {} is up running:", clusterName);
+        log.info("\tBinary Service Url : {}", getPlainTextServiceUrl());
+        log.info("\tHttp Service Url : {}", getHttpServiceUrl());
+    }
+
+    private static <T extends PulsarContainer> Map<String, T> runNumContainers(String
serviceName,
+                                                                               int numContainers,
+                                                                               Function<String,
T> containerCreator) {
+        List<CompletableFuture<?>> startFutures = Lists.newArrayList();
+        Map<String, T> containers = Maps.newTreeMap();
+        for (int i = 0; i < numContainers; i++) {
+            String name = "pulsar-" + serviceName + "-" + i;
+            T container = containerCreator.apply(name);
+            containers.put(name, container);
+            startFutures.add(CompletableFuture.runAsync(() -> container.start()));
+        }
+        CompletableFuture.allOf(startFutures.toArray(new CompletableFuture[startFutures.size()])).join();
+        log.info("Successfully started {} {} containers", numContainers, serviceName);
+        return containers;
+    }
+
+    public void stop() {
+        proxyContainer.stop();
+        brokerContainers.values().forEach(BrokerContainer::stop);
+        bookieContainers.values().forEach(BKContainer::stop);
+        csContainer.stop();
+        zkContainer.stop();
+        try {
+            network.close();
+        } catch (Exception e) {
+            log.info("Failed to shutdown network for pulsar cluster {}", clusterName, e);
+        }
+    }
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
new file mode 100644
index 0000000000..85c9f5124d
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.topologies;
+
+import lombok.Builder;
+import lombok.Builder.Default;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+/**
+ * Spec to build a pulsar cluster.
+ */
+@Builder
+@Accessors(fluent = true)
+@Getter
+@Setter
+public class PulsarClusterSpec {
+
+    /**
+     * Returns the cluster name.
+     *
+     * @return the cluster name.
+     */
+    String clusterName;
+
+    /**
+     * Returns number of bookies.
+     *
+     * @return number of bookies.
+     */
+    @Default
+    int numBookies = 3;
+
+    /**
+     * Returns number of brokers.
+     *
+     * @return number of brokers.
+     */
+    @Default
+    int numBrokers = 2;
+
+    /**
+     * Returns number of proxies.
+     *
+     * @return number of proxies.
+     */
+    @Default
+    int numProxies = 1;
+
+    /**
+     * Returns the flag whether to enable/disable container log.
+     *
+     * @return the flag whether to enable/disable container log.
+     */
+    boolean enableContainerLog = false;
+
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
new file mode 100644
index 0000000000..bee31b8553
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.topologies;
+
+import java.util.concurrent.ThreadLocalRandom;
+import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+@Slf4j
+public class PulsarClusterTestBase {
+
+    protected static PulsarCluster pulsarCluster;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 8; i++) {
+            sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
+        }
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+            .clusterName(sb.toString())
+            .build();
+
+        setupCluster(spec);
+    }
+
+    protected static void setupCluster(PulsarClusterSpec spec) throws Exception {
+        log.info("Setting up cluster {} with {} bookies, {} brokers",
+            spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+
+        log.info("Cluster {} is setup", spec.clusterName());
+    }
+
+    @AfterClass
+    public static void teardownCluster() {
+        if (null != pulsarCluster) {
+            pulsarCluster.stop();
+        }
+    }
+
+}
diff --git a/tests/integration-tests-utils/pom.xml b/tests/integration-tests-utils/pom.xml
index 71deafa44f..2ebed73556 100644
--- a/tests/integration-tests-utils/pom.xml
+++ b/tests/integration-tests-utils/pom.xml
@@ -37,16 +37,10 @@
 
   <name>Apache Pulsar :: Tests :: Utility module for Arquillian based integration tests</name>
 
-  <properties>
-    <arquillian-cube.version>1.15.1</arquillian-cube.version>
-    <commons-compress.version>1.15</commons-compress.version>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-compress</artifactId>
-      <version>${commons-compress.version}</version>
     </dependency>
 
     <dependency>
@@ -77,7 +71,6 @@
     <dependency>
       <groupId>org.arquillian.cube</groupId>
       <artifactId>arquillian-cube-docker</artifactId>
-      <version>${arquillian-cube.version}</version>
     </dependency>
 
   </dependencies>
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index b84e39896b..6788b1bfc7 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -37,5 +37,55 @@
     <module>compaction</module>
     <module>cli</module>
     <module>s3-offload</module>
+    <module>semantics</module>
   </modules>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <!-- only run tests when -DintegrationTests is specified //-->
+          <skipTests>true</skipTests>
+          <systemPropertyVariables>
+            <currentVersion>${project.version}</currentVersion>
+            <maven.buildDirectory>${project.build.directory}</maven.buildDirectory>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>integrationTests</id>
+      <activation>
+        <property>
+          <name>integrationTests</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <properties>
+                <property>
+                  <name>listener</name>
+                  <!-- AnnotationListener breaks arquillian, so don't use it //-->
+                  <value>org.apache.pulsar.tests.PulsarTestListener</value>
+                </property>
+              </properties>
+
+              <argLine>-Xmx2G -XX:MaxDirectMemorySize=8G
+              -Dio.netty.leakDetectionLevel=advanced
+              </argLine>
+              <skipTests>false</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>
diff --git a/tests/integration/semantics/pom.xml b/tests/integration/semantics/pom.xml
new file mode 100644
index 0000000000..5e8eecaa7d
--- /dev/null
+++ b/tests/integration/semantics/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="
+  http://maven.apache.org/POM/4.0.0
+  http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar.tests</groupId>
+    <artifactId>integration</artifactId>
+    <version>2.2.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.pulsar.tests.integration</groupId>
+  <artifactId>semantics</artifactId>
+  <packaging>jar</packaging>
+  <name>Apache Pulsar :: Tests :: Integration Tests :: Semantics</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar.tests</groupId>
+      <artifactId>integration-tests-topologies</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
new file mode 100644
index 0000000000..a87f85aaeb
--- /dev/null
+++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.semantics;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testng.annotations.Test;
+
+/**
+ * Test pulsar produce/consume semantics
+ */
+public class SemanticsTest extends PulsarClusterTestBase {
+
+    @Test
+    public void testPublishAndConsumePlainTextServiceUrl() throws Exception {
+        testPublishAndConsume(
+            pulsarCluster.getPlainTextServiceUrl(), "test-publish-consume-plain-text");
+    }
+
+    private void testPublishAndConsume(String serviceUrl, String topicName) throws Exception
{
+
+        int numMessages = 10;
+
+        try (PulsarClient client = PulsarClient.builder()
+            .serviceUrl(serviceUrl)
+            .build()) {
+
+            try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscribe()) {
+
+                try (Producer<String> producer = client.newProducer(Schema.STRING)
+                    .topic(topicName)
+                    .create()) {
+
+                    for (int i = 0; i < numMessages; i++) {
+                        producer.send("smoke-message-" + i);
+                    }
+                }
+
+                for (int i = 0; i < numMessages; i++) {
+                    Message<String> m = consumer.receive();
+                    assertEquals("smoke-message-" + i, m.getValue());
+                }
+            }
+        }
+    }
+
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message