pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: Fix process runtime with extra dependencies and re-enable python integration tests (#2926)
Date Mon, 05 Nov 2018 06:46:05 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee22e0a  Fix process runtime with extra dependencies and re-enable python integration
tests (#2926)
ee22e0a is described below

commit ee22e0a132955edbbdd3317dde0c4b60b3bb90b8
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Sun Nov 4 22:46:00 2018 -0800

    Fix process runtime with extra dependencies and re-enable python integration tests (#2926)
    
    ### Motivation
    
    fix python process with extra dependencies
    
    ```
    java.io.IOException: Cannot run program “PYTHONPATH=${PYTHONPATH}:/Users/jerrypeng/workspace/incubator-pulsar/instances/deps”:
error=2, No such file or directory
        at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) ~[?:1.8.0_144]
        at org.apache.pulsar.functions.runtime.ProcessRuntime.startProcess(ProcessRuntime.java:291)
[pulsar-functions-runtime.jar:2.3.0-SNAPSHOT]
        at org.apache.pulsar.functions.runtime.ProcessRuntime.start(ProcessRuntime.java:124)
[pulsar-functions-runtime.jar:2.3.0-SNAPSHOT]
        at org.apache.pulsar.functions.runtime.RuntimeSpawner$1.run(RuntimeSpawner.java:94)
[pulsar-functions-runtime.jar:2.3.0-SNAPSHOT]
        at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_144]
        at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_144]
    Caused by: java.io.IOException: error=2, No such file or directory
        at java.lang.UNIXProcess.forkAndExec(Native Method) ~[?:1.8.0_144]
        at java.lang.UNIXProcess.<init>(UNIXProcess.java:247) ~[?:1.8.0_144]
        at java.lang.ProcessImpl.start(ProcessImpl.java:134) ~[?:1.8.0_144]
        at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) ~[?:1.8.0_144]
        ... 5 more
    ```
    
    ### Modifications
    
    - set environment variables in process builder for process runtime
    - enable python function integration tests
    - add integration test for python function with extra dependencies
    
    ### Result
    
    python functions test passed and integration test for python function with extra deps
is tested.
---
 .../pulsar/functions/runtime/ProcessRuntime.java   |  14 ++-
 .../functions/runtime/ProcessRuntimeTest.java      |  19 +---
 .../docker-images/latest-version-image/Dockerfile  |   7 ++
 .../exclamation_lib.py}                            |  19 +---
 .../exclamation_with_extra_deps.py}                |  24 ++---
 .../integration/functions/PulsarFunctionsTest.java | 112 +++++++++++++++++----
 .../functions/PulsarFunctionsTestBase.java         |  12 ++-
 7 files changed, 140 insertions(+), 67 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 193761f..0e4319f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -28,13 +28,13 @@ import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
-import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 
 import java.io.InputStream;
@@ -65,6 +65,7 @@ class ProcessRuntime implements Runtime {
     private InstanceConfig instanceConfig;
     private final Long expectedHealthCheckInterval;
     private final SecretsProviderConfigurator secretsProviderConfigurator;
+    private final String extraDependenciesDir;
     private static final long GRPC_TIMEOUT_SECS = 5;
 
     ProcessRuntime(InstanceConfig instanceConfig,
@@ -95,10 +96,14 @@ class ProcessRuntime implements Runtime {
                 logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
                 break;
         }
+        this.extraDependenciesDir = extraDependenciesDir;
         this.processArgs = RuntimeUtils.composeArgs(
             instanceConfig,
             instanceFile,
-            extraDependenciesDir,
+            // DONT SET extra dependencies here (for python runtime),
+            // since process runtime is using Java ProcessBuilder,
+            // we have to set the environment variable via ProcessBuilder
+            FunctionDetails.Runtime.JAVA == instanceConfig.getFunctionDetails().getRuntime()
? extraDependenciesDir : null,
             logDirectory,
             codeFile,
             pulsarServiceUrl,
@@ -286,6 +291,9 @@ class ProcessRuntime implements Runtime {
         deathException = null;
         try {
             ProcessBuilder processBuilder = new ProcessBuilder(processArgs).inheritIO();
+            if (StringUtils.isNotEmpty(extraDependenciesDir)) {
+                processBuilder.environment().put("PYTHONPATH", "${PYTHONPATH}:" + extraDependenciesDir);
+            }
             secretsProviderConfigurator.configureProcessRuntimeSecretsProvider(processBuilder,
instanceConfig.getFunctionDetails());
             log.info("ProcessBuilder starting the process with args {}", String.join(" ",
processBuilder.command()));
             process = processBuilder.start();
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 35781fc..b88686a 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -303,21 +303,10 @@ public class ProcessRuntimeTest {
         ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
 
-        int totalArgs;
-        int portArg;
-        String pythonPath;
-        int configArg;
-        if (null == extraDepsDir) {
-            totalArgs = 30;
-            portArg = 23;
-            configArg = 9;
-            pythonPath = "";
-        } else {
-            totalArgs = 31;
-            portArg = 24;
-            configArg = 10;
-            pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " ";
-        }
+        int totalArgs = 30;
+        int portArg = 23;
+        String pythonPath = "";
+        int configArg = 9;
 
         assertEquals(args.size(), totalArgs);
         String expectedArgs = pythonPath + "python " + pythonInstanceFile
diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile
index 491a913..8026b17 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -33,3 +33,10 @@ COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
 COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
      scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh
scripts/run-presto-worker.sh \
      /pulsar/bin/
+
+# copy python test examples
+
+RUN mkdir -p /pulsar/instances/deps
+
+COPY python-examples/exclamation_lib.py /pulsar/instances/deps/
+COPY python-examples/exclamation_with_extra_deps.py /pulsar/examples/python-examples/
diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/python-examples/exclamation_lib.py
similarity index 50%
copy from tests/docker-images/latest-version-image/Dockerfile
copy to tests/docker-images/latest-version-image/python-examples/exclamation_lib.py
index 491a913..eefda34 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/python-examples/exclamation_lib.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -17,19 +18,5 @@
 # under the License.
 #
 
-FROM apachepulsar/pulsar-all:latest
-
-RUN apt-get update && apt-get install -y supervisor
-
-RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p
/pulsar/ssl
-
-COPY conf/supervisord.conf /etc/supervisord.conf
-COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf conf/functions_worker.conf
\
-     conf/proxy.conf conf/presto_worker.conf /etc/supervisord/conf.d/
-
-COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
-     ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/
-
-COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
-     scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh
scripts/run-presto-worker.sh \
-     /pulsar/bin/
+def exclamation(input):
+    return input + '!'
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/python-examples/exclamation_with_extra_deps.py
similarity index 51%
copy from tests/docker-images/latest-version-image/Dockerfile
copy to tests/docker-images/latest-version-image/python-examples/exclamation_with_extra_deps.py
index 491a913..f45d571 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/python-examples/exclamation_with_extra_deps.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -17,19 +18,14 @@
 # under the License.
 #
 
-FROM apachepulsar/pulsar-all:latest
+from pulsar import Function
+from exclamation_lib import exclamation
 
-RUN apt-get update && apt-get install -y supervisor
+# The classic ExclamationFunction that appends an exclamation at the end
+# of the input
+class ExclamationFunction(Function):
+  def __init__(self):
+    pass
 
-RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p
/pulsar/ssl
-
-COPY conf/supervisord.conf /etc/supervisord.conf
-COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf conf/functions_worker.conf
\
-     conf/proxy.conf conf/presto_worker.conf /etc/supervisord/conf.d/
-
-COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
-     ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/
-
-COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
-     scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh
scripts/run-presto-worker.sh \
-     /pulsar/bin/
+  def process(self, input, context):
+    return exclamation(input)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 8b52358..5f6f7b1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -611,49 +612,64 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
     // Test CRUD functions on different runtimes.
     //
 
-    @Test(enabled = false)
+    @Test
     public void testPythonExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, false, false);
+        testExclamationFunction(Runtime.PYTHON, false, false, false);
     }
 
-    @Test(enabled = false)
+    @Test
+    public void testPythonExclamationFunctionWithExtraDeps() throws Exception {
+        testExclamationFunction(Runtime.PYTHON, false, false, true);
+    }
+
+    @Test
     public void testPythonExclamationZipFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, false, true);
+        testExclamationFunction(Runtime.PYTHON, false, true, false);
     }
 
-    @Test(enabled = false)
+    @Test
     public void testPythonExclamationTopicPatternFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, true, false);
+        testExclamationFunction(Runtime.PYTHON, true, false, false);
     }
 
     @Test
     public void testJavaExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.JAVA, false, false);
+        testExclamationFunction(Runtime.JAVA, false, false, false);
     }
 
     @Test
     public void testJavaExclamationTopicPatternFunction() throws Exception {
-        testExclamationFunction(Runtime.JAVA, true, false);
+        testExclamationFunction(Runtime.JAVA, true, false, false);
     }
 
-    private void testExclamationFunction(Runtime runtime, boolean isTopicPattern, boolean
pyZip) throws Exception {
+    private void testExclamationFunction(Runtime runtime,
+                                         boolean isTopicPattern,
+                                         boolean pyZip,
+                                         boolean withExtraDeps) throws Exception {
         if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON)
{
             // python can only run on process mode
             return;
         }
 
+        Schema<?> schema;
+        if (Runtime.JAVA == runtime) {
+            schema = Schema.STRING;
+        } else {
+            schema = Schema.BYTES;
+        }
+
         String inputTopicName = "persistent://public/default/test-exclamation-" + runtime
+ "-input-" + randomName(8);
         String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
         if (isTopicPattern) {
             @Cleanup PulsarClient client = PulsarClient.builder()
                     .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
                     .build();
-            @Cleanup Consumer<String> consumer1 = client.newConsumer(Schema.STRING)
+            @Cleanup Consumer<?> consumer1 = client.newConsumer(schema)
                     .topic(inputTopicName + "1")
                     .subscriptionType(SubscriptionType.Exclusive)
                     .subscriptionName("test-sub")
                     .subscribe();
-            @Cleanup Consumer<String> consumer2 = client.newConsumer(Schema.STRING)
+            @Cleanup Consumer<?> consumer2 = client.newConsumer(schema)
                     .topic(inputTopicName + "2")
                     .subscriptionType(SubscriptionType.Exclusive)
                     .subscriptionName("test-sub")
@@ -665,13 +681,19 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
 
         // submit the exclamation function
         submitExclamationFunction(
-            runtime, inputTopicName, outputTopicName, functionName, pyZip);
+            runtime, inputTopicName, outputTopicName, functionName, pyZip, withExtraDeps,
schema);
 
         // get function info
         getFunctionInfoSuccess(functionName);
 
         // publish and consume result
-        publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
+        if (Runtime.JAVA == runtime) {
+            // java supports schema
+            publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
+        } else {
+            // python doesn't support schema
+            publishAndConsumeMessagesBytes(inputTopicName, outputTopicName, numMessages);
+        }
 
         // get function status
         getFunctionStatus(functionName, numMessages);
@@ -687,15 +709,18 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
                                                   String inputTopicName,
                                                   String outputTopicName,
                                                   String functionName,
-                                                  boolean pyZip) throws Exception {
+                                                  boolean pyZip,
+                                                  boolean withExtraDeps,
+                                                  Schema<?> schema) throws Exception
{
         submitFunction(
             runtime,
             inputTopicName,
             outputTopicName,
             functionName,
             pyZip,
-            getExclamationClass(runtime, pyZip),
-            Schema.STRING);
+            withExtraDeps,
+            getExclamationClass(runtime, pyZip, withExtraDeps),
+            schema);
     }
 
     private static <T> void submitFunction(Runtime runtime,
@@ -703,6 +728,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
                                            String outputTopicName,
                                            String functionName,
                                            boolean pyZip,
+                                           boolean withExtraDeps,
                                            String functionClass,
                                            Schema<T> inputTopicSchema) throws Exception
{
         CommandGenerator generator;
@@ -723,6 +749,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
             generator.setRuntime(runtime);
             if (pyZip) {
                 command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHONZIP_FILE);
+            } else if (withExtraDeps) {
+                command = generator.generateCreateFunctionCommand(EXCLAMATION_WITH_DEPS_PYTHON_FILE);
             } else {
                 command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
             }
@@ -850,6 +878,56 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
         }
     }
 
+    private static void publishAndConsumeMessagesBytes(String inputTopic,
+                                                       String outputTopic,
+                                                       int numMessages) throws Exception
{
+        @Cleanup PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
+            .topic(outputTopic)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscriptionName("test-sub")
+            .subscribe();
+        if (inputTopic.endsWith(".*")) {
+            @Cleanup Producer<byte[]> producer1 = client.newProducer(Schema.BYTES)
+                    .topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
+                    .create();
+            @Cleanup Producer<byte[]> producer2 = client.newProducer(Schema.BYTES)
+                    .topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
+                    .create();
+
+            for (int i = 0; i < numMessages / 2; i++) {
+                producer1.send(("message-" + i).getBytes(UTF_8));
+            }
+
+            for (int i = numMessages / 2; i < numMessages; i++) {
+                producer2.send(("message-" + i).getBytes(UTF_8));
+            }
+        } else {
+            @Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                    .topic(inputTopic)
+                    .create();
+
+            for (int i = 0; i < numMessages; i++) {
+                producer.send(("message-" + i).getBytes(UTF_8));
+            }
+        }
+
+        Set<String> expectedMessages = new HashSet<>();
+        for (int i = 0; i < numMessages; i++) {
+            expectedMessages.add("message-" + i + "!");
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
+            String msgValue = new String(msg.getValue(), UTF_8);
+            log.info("Received: {}", msgValue);
+            assertTrue(expectedMessages.contains(msgValue));
+            expectedMessages.remove(msgValue);
+        }
+    }
+
     private static void deleteFunction(String functionName) throws Exception {
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
@@ -872,7 +950,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase
{
 
         // submit the exclamation function
         submitFunction(
-            Runtime.JAVA, inputTopicName, outputTopicName, functionName, false,
+            Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, false,
             AutoSchemaFunction.class.getName(),
             Schema.AVRO(CustomObject.class));
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 9acdf1f..851793c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -74,20 +74,28 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite
{
         "org.apache.pulsar.functions.api.examples.ExclamationFunction";
 
     public static final String EXCLAMATION_PYTHON_CLASS =
-        "exclamation.ExclamationFunction";
+        "exclamation_function.ExclamationFunction";
+
+    public static final String EXCLAMATION_WITH_DEPS_PYTHON_CLASS =
+        "exclamation_with_extra_deps.ExclamationFunction";
 
     public static final String EXCLAMATION_PYTHONZIP_CLASS =
             "exclamation";
 
     public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py";
+    public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = "exclamation_with_extra_deps.py";
     public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip";
 
-    protected static String getExclamationClass(Runtime runtime, boolean pyZip) {
+    protected static String getExclamationClass(Runtime runtime,
+                                                boolean pyZip,
+                                                boolean extraDeps) {
         if (Runtime.JAVA == runtime) {
             return EXCLAMATION_JAVA_CLASS;
         } else if (Runtime.PYTHON == runtime) {
             if (pyZip) {
                 return EXCLAMATION_PYTHONZIP_CLASS;
+            } else if (extraDeps) {
+                return EXCLAMATION_WITH_DEPS_PYTHON_CLASS;
             } else {
                 return EXCLAMATION_PYTHON_CLASS;
             }


Mime
View raw message