pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2063: Add an integration test on pulsar functions on process mode
Date Mon, 02 Jul 2018 17:22:19 GMT
sijie closed pull request #2063: Add an integration test on pulsar functions on process mode
URL: https://github.com/apache/incubator-pulsar/pull/2063
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/integration/semantics/pom.xml b/tests/integration/semantics/pom.xml
index 5e8eecaa7d..63c16219ed 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -37,6 +37,10 @@
   <name>Apache Pulsar :: Tests :: Integration Tests :: Semantics</name>
 
   <dependencies>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-client</artifactId>
diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 07d86f7f0c..a35d4662b2 100644
--- a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,12 +18,20 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
-import com.google.common.io.Files;
-import java.io.File;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.tests.containers.WorkerContainer;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
 import org.apache.pulsar.tests.topologies.PulsarCluster;
 import org.testcontainers.containers.Container.ExecResult;
@@ -32,6 +40,10 @@
 @Slf4j
 public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
+    //
+    // Tests on uploading/downloading function packages.
+    //
+
     @Test
     public String checkUpload() throws Exception {
         String bkPkgPath = String.format("%s/%s/%s",
@@ -82,4 +94,126 @@ public void checkDownload() throws Exception {
         assertTrue(output.getStderr().isEmpty());
     }
 
+    //
+    // Test CRUD functions on different runtimes.
+    //
+
+    @Test(dataProvider = "FunctionRuntimes")
+    public void testExclamationFunction(Runtime runtime) throws Exception {
+        String inputTopicName = "test-exclamation-" + runtime + "-input-" + randomName(8);
+        String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
+        String functionName = "test-exclamation-fn-" + randomName(8);
+        final int numMessages = 10;
+
+        // submit the exclamation function
+        submitExclamationFunction(
+            inputTopicName, outputTopicName, functionName);
+
+        // get function info
+        getFunctionInfoSuccess(functionName);
+
+        // publish and consume result
+        publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
+
+        // get function status
+        getFunctionStatus(functionName, numMessages);
+
+        // delete function
+        deleteFunction(functionName);
+
+        // get function info
+        getFunctionInfoNotFound(functionName);
+    }
+
+    private static void submitExclamationFunction(String inputTopicName,
+                                                  String outputTopicName,
+                                                  String functionName) throws Exception {
+        CommandGenerator generator = CommandGenerator.createDefaultGenerator(inputTopicName,
EXCLAMATION_FUNC_CLASS);
+        generator.setSinkTopic(outputTopicName);
+        generator.setFunctionName(functionName);
+        String command = generator.generateCreateFunctionCommand();
+        String[] commands = {
+            "sh", "-c", command
+        };
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            commands);
+        assertTrue(result.getStdout().contains("\"Created successfully\""));
+    }
+
+    private static void getFunctionInfoSuccess(String functionName) throws Exception {
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\""));
+    }
+
+    private static void getFunctionInfoNotFound(String functionName) throws Exception {
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStderr().contains("Reason: Function " + functionName + " doesn't
exist"));
+    }
+
+    private static void getFunctionStatus(String functionName, int numMessages) throws Exception
{
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("\"running\": true"));
+        assertTrue(result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\""));
+        assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": \"" + numMessages
+ "\""));
+    }
+
+    private static void publishAndConsumeMessages(String inputTopic,
+                                                  String outputTopic,
+                                                  int numMessages) throws Exception {
+        @Cleanup PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(outputTopic)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscriptionName("test-sub")
+            .subscribe();
+        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(inputTopic)
+            .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("message-" + i);
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals("message-" + i + "!", msg.getValue());
+        }
+    }
+
+    private static void deleteFunction(String functionName) throws Exception {
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "delete",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("Deleted successfully"));
+        assertTrue(result.getStderr().isEmpty());
+    }
+
 }
diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 8e692ad7b0..c0acd1de54 100644
--- a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,14 +18,19 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 
 /**
  * A cluster to run pulsar functions for testing functions related features.
  */
 public class PulsarFunctionsTestBase extends PulsarClusterTestBase  {
 
+    public static final String EXCLAMATION_FUNC_CLASS =
+        "org.apache.pulsar.functions.api.examples.ExclamationFunction";
+
     @BeforeClass
     public static void setupCluster() throws Exception {
         PulsarClusterTestBase.setupCluster();
@@ -33,4 +38,11 @@ public static void setupCluster() throws Exception {
         pulsarCluster.startFunctionWorkers(1);
     }
 
+    @DataProvider(name = "FunctionRuntimes")
+    public static Object[][] functionRuntimes() {
+        return new Object[][] {
+            new Object[] { Runtime.JAVA }
+        };
+    }
+
 }
diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
new file mode 100644
index 0000000000..12225fec10
--- /dev/null
+++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.utils;
+
+import com.google.gson.Gson;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+
+@Getter
+@Setter
+@ToString
+public class CommandGenerator {
+    public enum Runtime {
+        JAVA,
+        PYTHON,
+    };
+    private String functionName;
+    private String tenant = "public";
+    private String namespace = "default";
+    private String functionClassName;
+    private String sourceTopic;
+    private Map<String, String> customSereSourceTopics;
+    private String sinkTopic;
+    private String logTopic;
+    private String outputSerDe;
+    private String processingGuarantees;
+    private Runtime runtime;
+    private Integer parallelism;
+    private String adminUrl;
+    private Integer windowLengthCount;
+    private Long windowLengthDurationMs;
+    private Integer slidingIntervalCount;
+    private Long slidingIntervalDurationMs;
+
+    private Map<String, String> userConfig = new HashMap<>();
+    private static final String JAVAJAR = "/pulsar/examples/api-examples.jar";
+    private static final String PYTHONBASE = "/pulsar/examples/python";
+
+    public static CommandGenerator createDefaultGenerator(String sourceTopic, String functionClassName)
{
+        CommandGenerator generator = new CommandGenerator();
+        generator.setSourceTopic(sourceTopic);
+        generator.setFunctionClassName(functionClassName);
+        generator.setRuntime(Runtime.JAVA);
+        return generator;
+    }
+
+    public static CommandGenerator createDefaultGenerator(Map<String, String> customSereSourceTopics,
+                                                          String functionClassName) {
+        CommandGenerator generator = new CommandGenerator();
+        generator.setCustomSereSourceTopics(customSereSourceTopics);
+        generator.setFunctionClassName(functionClassName);
+        generator.setRuntime(Runtime.JAVA);
+        return generator;
+    }
+
+    public static CommandGenerator createDefaultGenerator(String tenant, String namespace,
String functionName) {
+        CommandGenerator generator = new CommandGenerator();
+        generator.setTenant(tenant);
+        generator.setNamespace(namespace);
+        generator.setFunctionName(functionName);
+        generator.setRuntime(Runtime.JAVA);
+        return generator;
+    }
+
+    public void createAdminUrl(String workerHost, int port) {
+        adminUrl = "http://" + workerHost + ":" + port;
+    }
+
+    public String generateCreateFunctionCommand() {
+        return generateCreateFunctionCommand(null);
+    }
+
+    public String generateCreateFunctionCommand(String codeFile) {
+        StringBuilder commandBuilder = new StringBuilder(PulsarCluster.ADMIN_SCRIPT);
+        if (adminUrl != null) {
+            commandBuilder.append(" --admin-url ");
+            commandBuilder.append(adminUrl);
+        }
+        commandBuilder.append(" functions create");
+        if (tenant != null) {
+            commandBuilder.append(" --tenant " + tenant);
+        }
+        if (namespace != null) {
+            commandBuilder.append(" --namespace " + namespace);
+        }
+        if (functionName != null) {
+            commandBuilder.append(" --name " + functionName);
+        }
+        commandBuilder.append(" --className " + functionClassName);
+        if (sourceTopic != null) {
+            commandBuilder.append(" --inputs " + sourceTopic);
+        }
+        if (logTopic != null) {
+            commandBuilder.append(" --logTopic " + logTopic);
+        }
+        if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty())
{
+            commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics)
+ "\'");
+        }
+        if (sinkTopic != null) {
+            commandBuilder.append(" --output " + sinkTopic);
+        }
+        if (outputSerDe != null) {
+            commandBuilder.append(" --outputSerdeClassName " + outputSerDe);
+        }
+        if (processingGuarantees != null) {
+            commandBuilder.append(" --processingGuarantees " + processingGuarantees);
+        }
+        if (!userConfig.isEmpty()) {
+            commandBuilder.append(" --userConfig \'" + new Gson().toJson(userConfig) + "\'");
+        }
+        if (parallelism != null) {
+            commandBuilder.append(" --parallelism " + parallelism);
+        }
+        if (windowLengthCount != null) {
+            commandBuilder.append(" --windowLengthCount " + windowLengthCount);
+        }
+        if (windowLengthDurationMs != null)  {
+            commandBuilder.append(" --windowLengthDurationMs " + windowLengthDurationMs);
+        }
+        if (slidingIntervalCount != null)  {
+            commandBuilder.append( " --slidingIntervalCount " + slidingIntervalCount);
+        }
+        if (slidingIntervalDurationMs != null)  {
+            commandBuilder.append(" --slidingIntervalDurationMs " + slidingIntervalDurationMs);
+        }
+
+        if (runtime == Runtime.JAVA) {
+            commandBuilder.append(" --jar " + JAVAJAR);
+        } else {
+            if (codeFile != null) {
+                commandBuilder.append(" --py " + PYTHONBASE + codeFile);
+            } else {
+                commandBuilder.append(" --py " + PYTHONBASE);
+            }
+        }
+        return commandBuilder.toString();
+    }
+
+    public String generateUpdateFunctionCommand() {
+        return generateUpdateFunctionCommand(null);
+    }
+
+    public String generateUpdateFunctionCommand(String codeFile) {
+        StringBuilder commandBuilder = new StringBuilder("PULSAR_MEM=-Xmx1024m ");
+        if (adminUrl == null) {
+            commandBuilder.append("/pulsar/bin/pulsar-admin functions update");
+        } else {
+            commandBuilder.append("/pulsar/bin/pulsar-admin");
+            commandBuilder.append(" --admin-url ");
+            commandBuilder.append(adminUrl);
+            commandBuilder.append(" functions update");
+        }
+        if (tenant != null) {
+            commandBuilder.append(" --tenant " + tenant);
+        }
+        if (namespace != null) {
+            commandBuilder.append(" --namespace " + namespace);
+        }
+        if (functionName != null) {
+            commandBuilder.append(" --name " + functionName);
+        }
+        commandBuilder.append(" --className " + functionClassName);
+        if (sourceTopic != null) {
+            commandBuilder.append(" --inputs " + sourceTopic);
+        }
+        if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty())
{
+            commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics)
+ "\'");
+        }
+        if (sinkTopic != null) {
+            commandBuilder.append(" --output " + sinkTopic);
+        }
+        if (logTopic != null) {
+            commandBuilder.append(" --logTopic " + logTopic);
+        }
+        if (outputSerDe != null) {
+            commandBuilder.append(" --outputSerdeClassName " + outputSerDe);
+        }
+        if (processingGuarantees != null) {
+            commandBuilder.append(" --processingGuarantees " + processingGuarantees);
+        }
+        if (!userConfig.isEmpty()) {
+            commandBuilder.append(" --userConfig \'" + new Gson().toJson(userConfig) + "\'");
+        }
+        if (parallelism != null) {
+            commandBuilder.append(" --parallelism " + parallelism);
+        }
+        if (windowLengthCount != null) {
+            commandBuilder.append(" --windowLengthCount " + windowLengthCount);
+        }
+        if (windowLengthDurationMs != null)  {
+            commandBuilder.append(" --windowLengthDurationMs " + windowLengthDurationMs);
+        }
+        if (slidingIntervalCount != null)  {
+            commandBuilder.append(" --slidingIntervalCount " + slidingIntervalCount);
+        }
+        if (slidingIntervalDurationMs != null)  {
+            commandBuilder.append(" --slidingIntervalDurationMs " + slidingIntervalDurationMs);
+        }
+
+        if (runtime == Runtime.JAVA) {
+            commandBuilder.append(" --jar " + JAVAJAR);
+        } else {
+            if (codeFile != null) {
+                commandBuilder.append(" --py " + PYTHONBASE + codeFile);
+            } else {
+                commandBuilder.append(" --py " + PYTHONBASE);
+            }
+        }
+        return commandBuilder.toString();
+    }
+
+    public String genereateDeleteFunctionCommand() {
+        StringBuilder commandBuilder = new StringBuilder("/pulsar/bin/pulsar-admin functions
delete");
+        if (tenant != null) {
+            commandBuilder.append(" --tenant " + tenant);
+        }
+        if (namespace != null) {
+            commandBuilder.append(" --namespace " + namespace);
+        }
+        if (functionName != null) {
+            commandBuilder.append(" --name " + functionName);
+        }
+        return commandBuilder.toString();
+    }
+
+    public String generateTriggerFunctionCommand(String triggerValue) {
+        StringBuilder commandBuilder = new StringBuilder("/pulsar/bin/pulsar-admin functions
trigger");
+        if (tenant != null) {
+            commandBuilder.append(" --tenant " + tenant);
+        }
+        if (namespace != null) {
+            commandBuilder.append(" --namespace " + namespace);
+        }
+        if (functionName != null) {
+            commandBuilder.append(" --name " + functionName);
+        }
+        commandBuilder.append(" --triggerValue " + triggerValue);
+        return commandBuilder.toString();
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message