pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2059: Add semantic integration tests for non-persistent topics and effectively-once producing
Date Sun, 01 Jul 2018 05:58:10 GMT
sijie closed pull request #2059: Add semantic integration tests for non-persistent topics and
effectively-once producing
URL: https://github.com/apache/incubator-pulsar/pull/2059
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index a30509d26c..9e2860f233 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pulsar.tests.topologies;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -34,6 +36,7 @@
 import org.apache.pulsar.tests.containers.ProxyContainer;
 import org.apache.pulsar.tests.containers.PulsarContainer;
 import org.apache.pulsar.tests.containers.ZKContainer;
+import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.Network;
 
 /**
@@ -42,6 +45,8 @@
 @Slf4j
 public class PulsarCluster {
 
+    protected static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
+
     /**
      * Pulsar Cluster Spec.
      *
@@ -169,4 +174,32 @@ public void stop() {
             log.info("Failed to shutdown network for pulsar cluster {}", clusterName, e);
         }
     }
+
+    public BrokerContainer getAnyBroker() {
+        List<BrokerContainer> brokerList = Lists.newArrayList();
+        brokerList.addAll(brokerContainers.values());
+        Collections.shuffle(brokerList);
+        checkArgument(!brokerList.isEmpty(), "No broker is alive");
+        return brokerList.get(0);
+    }
+
+    public ExecResult runAdminCommandOnAnyBroker(String...commands) throws Exception {
+        BrokerContainer container = getAnyBroker();
+        String[] cmds = new String[commands.length + 1];
+        cmds[0] = ADMIN_SCRIPT;
+        System.arraycopy(commands, 0, cmds, 1, commands.length);
+        return container.execCmd(cmds);
+    }
+
+    public ExecResult createNamespace(String nsName) throws Exception {
+        return runAdminCommandOnAnyBroker(
+            "namespaces", "create", "public/" + nsName,
+            "--clusters", clusterName);
+    }
+
+    public ExecResult enableDeduplication(String nsName, boolean enabled) throws Exception
{
+        return runAdminCommandOnAnyBroker(
+            "namespaces", "set-deduplication", "public/" + nsName,
+            enabled ? "--enable" : "--disable");
+    }
 }
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index bee31b8553..8cd49b1c6c 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -22,10 +22,37 @@
 import lombok.extern.slf4j.Slf4j;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 
 @Slf4j
 public class PulsarClusterTestBase {
 
+    @DataProvider(name = "ServiceUrlAndTopics")
+    public static Object[][] serviceUrlAndTopics() {
+        return new Object[][] {
+            // plain text, persistent topic
+            {
+                pulsarCluster.getPlainTextServiceUrl(),
+                true,
+            },
+            // plain text, non-persistent topic
+            {
+                pulsarCluster.getPlainTextServiceUrl(),
+                false
+            }
+        };
+    }
+
+    @DataProvider(name = "ServiceUrls")
+    public static Object[][] serviceUrls() {
+        return new Object[][] {
+            // plain text
+            {
+                pulsarCluster.getPlainTextServiceUrl()
+            }
+        };
+    }
+
     protected static PulsarCluster pulsarCluster;
 
     @BeforeClass
@@ -58,4 +85,36 @@ public static void teardownCluster() {
         }
     }
 
+    protected static String randomName(int numChars) {
+        StringBuilder sb = new StringBuilder();;;;
+        for (int i = 0; i < 8; i++) {
+            sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
+        }
+        return sb.toString();
+    }
+
+    protected static String generateNamespaceName() {
+        return "ns-" + randomName(8);
+    }
+
+    protected static String generateTopicName(String topicPrefix, boolean isPersistent) {
+        return generateTopicName("default", topicPrefix, isPersistent);
+    }
+
+    protected static String generateTopicName(String namespace, String topicPrefix, boolean
isPersistent) {
+        String topicName = new StringBuilder(topicPrefix)
+            .append("-")
+            .append(randomName(8))
+            .append("-")
+            .append(System.currentTimeMillis())
+            .toString();
+        if (isPersistent) {
+            return "persistent://public/" + namespace + "/" + topicName;
+        } else {
+            return "non-persistent://public/" + namespace + "/" + topicName;
+        }
+    }
+
+
+
 }
diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
index a87f85aaeb..1ba6533169 100644
--- a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
+++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -20,26 +20,31 @@
 
 import static org.testng.Assert.assertEquals;
 
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
 import org.testng.annotations.Test;
 
 /**
  * Test pulsar produce/consume semantics
  */
+@Slf4j
 public class SemanticsTest extends PulsarClusterTestBase {
 
-    @Test
-    public void testPublishAndConsumePlainTextServiceUrl() throws Exception {
-        testPublishAndConsume(
-            pulsarCluster.getPlainTextServiceUrl(), "test-publish-consume-plain-text");
-    }
+    //
+    // Test Basic Publish & Consume Operations
+    //
 
-    private void testPublishAndConsume(String serviceUrl, String topicName) throws Exception
{
+    @Test(dataProvider = "ServiceUrlAndTopics")
+    public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception
{
+        String topicName = generateTopicName("testpubconsume", isPersistent);
 
         int numMessages = 10;
 
@@ -69,5 +74,114 @@ private void testPublishAndConsume(String serviceUrl, String topicName)
throws E
         }
     }
 
+    @Test(dataProvider = "ServiceUrls")
+    public void testEffectivelyOnceDisabled(String serviceUrl) throws Exception {
+        String nsName = generateNamespaceName();
+        pulsarCluster.createNamespace(nsName);
+
+        String topicName = generateTopicName(nsName, "testeffectivelyonce", true);
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(serviceUrl)
+            .build();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(topicName)
+            .subscriptionName("test-sub")
+            .ackTimeout(10, TimeUnit.SECONDS)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscribe();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(topicName)
+            .enableBatching(false)
+            .producerName("effectively-once-producer")
+            .initialSequenceId(1L)
+            .create();
+
+        // send messages
+        sendMessagesIdempotency(producer);
+
+        // checkout the result
+        checkMessagesIdempotencyDisabled(consumer);
+    }
+
+    private static void sendMessagesIdempotency(Producer<String> producer) throws Exception
{
+        // sending message
+        producer.newMessage()
+            .sequenceId(1L)
+            .value("message-1")
+            .send();
+
+        // sending a duplicated message
+        producer.newMessage()
+            .sequenceId(1L)
+            .value("duplicated-message-1")
+            .send();
 
+        // sending a second message
+        producer.newMessage()
+            .sequenceId(2L)
+            .value("message-2")
+            .send();
+    }
+
+    private static void checkMessagesIdempotencyDisabled(Consumer<String> consumer)
throws Exception {
+        receiveAndAssertMessage(consumer, 1L, "message-1");
+        receiveAndAssertMessage(consumer, 1L, "duplicated-message-1");
+        receiveAndAssertMessage(consumer, 2L, "message-2");
+    }
+
+    private static void receiveAndAssertMessage(Consumer<String> consumer,
+                                                long expectedSequenceId,
+                                                String expectedContent) throws Exception
{
+        Message<String> msg = consumer.receive();
+        log.info("Received message {}", msg);
+        assertEquals(expectedSequenceId, msg.getSequenceId());
+        assertEquals(expectedContent, msg.getValue());
+    }
+
+    @Test(dataProvider = "ServiceUrls")
+    public void testEffectivelyOnceEnabled(String serviceUrl) throws Exception {
+        String nsName = generateNamespaceName();
+        pulsarCluster.createNamespace(nsName);
+        pulsarCluster.enableDeduplication(nsName, true);
+
+        String topicName = generateTopicName(nsName, "testeffectivelyonce", true);
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(serviceUrl)
+            .build();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(topicName)
+            .subscriptionName("test-sub")
+            .ackTimeout(10, TimeUnit.SECONDS)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscribe();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(topicName)
+            .enableBatching(false)
+            .producerName("effectively-once-producer")
+            .initialSequenceId(1L)
+            .create();
+
+        // send messages
+        sendMessagesIdempotency(producer);
+
+        // checkout the result
+        checkMessagesIdempotencyEnabled(consumer);
+    }
+
+    private static void checkMessagesIdempotencyEnabled(Consumer<String> consumer)
throws Exception {
+        receiveAndAssertMessage(consumer, 1L, "message-1");
+        receiveAndAssertMessage(consumer, 2L, "message-2");
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message