pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2926: Fix process runtime with extra dependencies and re-enable python integration tests
Date Mon, 05 Nov 2018 06:46:02 GMT
sijie closed pull request #2926: Fix process runtime with extra dependencies and re-enable
python integration tests
URL: https://github.com/apache/pulsar/pull/2926
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 193761f76b..0e4319f561 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -28,13 +28,13 @@
 import io.grpc.ManagedChannelBuilder;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
-import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 
 import java.io.InputStream;
@@ -65,6 +65,7 @@
     private InstanceConfig instanceConfig;
     private final Long expectedHealthCheckInterval;
     private final SecretsProviderConfigurator secretsProviderConfigurator;
+    private final String extraDependenciesDir;
     private static final long GRPC_TIMEOUT_SECS = 5;
 
     ProcessRuntime(InstanceConfig instanceConfig,
@@ -95,10 +96,14 @@
                 logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
                 break;
         }
+        this.extraDependenciesDir = extraDependenciesDir;
         this.processArgs = RuntimeUtils.composeArgs(
             instanceConfig,
             instanceFile,
-            extraDependenciesDir,
+            // DONT SET extra dependencies here (for python runtime),
+            // since process runtime is using Java ProcessBuilder,
+            // we have to set the environment variable via ProcessBuilder
+            FunctionDetails.Runtime.JAVA == instanceConfig.getFunctionDetails().getRuntime()
? extraDependenciesDir : null,
             logDirectory,
             codeFile,
             pulsarServiceUrl,
@@ -286,6 +291,9 @@ private void startProcess() {
         deathException = null;
         try {
             ProcessBuilder processBuilder = new ProcessBuilder(processArgs).inheritIO();
+            if (StringUtils.isNotEmpty(extraDependenciesDir)) {
+                processBuilder.environment().put("PYTHONPATH", "${PYTHONPATH}:" + extraDependenciesDir);
+            }
             secretsProviderConfigurator.configureProcessRuntimeSecretsProvider(processBuilder,
instanceConfig.getFunctionDetails());
             log.info("ProcessBuilder starting the process with args {}", String.join(" ",
processBuilder.command()));
             process = processBuilder.start();
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 35781fc586..b88686a362 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -303,21 +303,10 @@ private void verifyPythonInstance(InstanceConfig config, String extraDepsDir)
th
         ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
 
-        int totalArgs;
-        int portArg;
-        String pythonPath;
-        int configArg;
-        if (null == extraDepsDir) {
-            totalArgs = 30;
-            portArg = 23;
-            configArg = 9;
-            pythonPath = "";
-        } else {
-            totalArgs = 31;
-            portArg = 24;
-            configArg = 10;
-            pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " ";
-        }
+        int totalArgs = 30;
+        int portArg = 23;
+        String pythonPath = "";
+        int configArg = 9;
 
         assertEquals(args.size(), totalArgs);
         String expectedArgs = pythonPath + "python " + pythonInstanceFile
diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile
index 491a913098..8026b175da 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -33,3 +33,10 @@ COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
 COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
      scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh
scripts/run-presto-worker.sh \
      /pulsar/bin/
+
+# copy python test examples
+
+RUN mkdir -p /pulsar/instances/deps
+
+COPY python-examples/exclamation_lib.py /pulsar/instances/deps/
+COPY python-examples/exclamation_with_extra_deps.py /pulsar/examples/python-examples/
diff --git a/tests/docker-images/latest-version-image/python-examples/exclamation_lib.py b/tests/docker-images/latest-version-image/python-examples/exclamation_lib.py
new file mode 100644
index 0000000000..eefda34a6a
--- /dev/null
+++ b/tests/docker-images/latest-version-image/python-examples/exclamation_lib.py
@@ -0,0 +1,22 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+def exclamation(input):
+    return input + '!'
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/python-examples/exclamation_with_extra_deps.py
b/tests/docker-images/latest-version-image/python-examples/exclamation_with_extra_deps.py
new file mode 100644
index 0000000000..f45d571ccc
--- /dev/null
+++ b/tests/docker-images/latest-version-image/python-examples/exclamation_with_extra_deps.py
@@ -0,0 +1,31 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from pulsar import Function
+from exclamation_lib import exclamation
+
+# The classic ExclamationFunction that appends an exclamation at the end
+# of the input
+class ExclamationFunction(Function):
+  def __init__(self):
+    pass
+
+  def process(self, input, context):
+    return exclamation(input)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 8b523582f2..5f6f7b19ae 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -611,49 +612,64 @@ protected void getSourceInfoNotFound(String tenant, String namespace,
String sou
     // Test CRUD functions on different runtimes.
     //
 
-    @Test(enabled = false)
+    @Test
     public void testPythonExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, false, false);
+        testExclamationFunction(Runtime.PYTHON, false, false, false);
     }
 
-    @Test(enabled = false)
+    @Test
+    public void testPythonExclamationFunctionWithExtraDeps() throws Exception {
+        testExclamationFunction(Runtime.PYTHON, false, false, true);
+    }
+
+    @Test
     public void testPythonExclamationZipFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, false, true);
+        testExclamationFunction(Runtime.PYTHON, false, true, false);
     }
 
-    @Test(enabled = false)
+    @Test
     public void testPythonExclamationTopicPatternFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, true, false);
+        testExclamationFunction(Runtime.PYTHON, true, false, false);
     }
 
     @Test
     public void testJavaExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.JAVA, false, false);
+        testExclamationFunction(Runtime.JAVA, false, false, false);
     }
 
     @Test
     public void testJavaExclamationTopicPatternFunction() throws Exception {
-        testExclamationFunction(Runtime.JAVA, true, false);
+        testExclamationFunction(Runtime.JAVA, true, false, false);
     }
 
-    private void testExclamationFunction(Runtime runtime, boolean isTopicPattern, boolean
pyZip) throws Exception {
+    private void testExclamationFunction(Runtime runtime,
+                                         boolean isTopicPattern,
+                                         boolean pyZip,
+                                         boolean withExtraDeps) throws Exception {
         if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON)
{
             // python can only run on process mode
             return;
         }
 
+        Schema<?> schema;
+        if (Runtime.JAVA == runtime) {
+            schema = Schema.STRING;
+        } else {
+            schema = Schema.BYTES;
+        }
+
         String inputTopicName = "persistent://public/default/test-exclamation-" + runtime
+ "-input-" + randomName(8);
         String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
         if (isTopicPattern) {
             @Cleanup PulsarClient client = PulsarClient.builder()
                     .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
                     .build();
-            @Cleanup Consumer<String> consumer1 = client.newConsumer(Schema.STRING)
+            @Cleanup Consumer<?> consumer1 = client.newConsumer(schema)
                     .topic(inputTopicName + "1")
                     .subscriptionType(SubscriptionType.Exclusive)
                     .subscriptionName("test-sub")
                     .subscribe();
-            @Cleanup Consumer<String> consumer2 = client.newConsumer(Schema.STRING)
+            @Cleanup Consumer<?> consumer2 = client.newConsumer(schema)
                     .topic(inputTopicName + "2")
                     .subscriptionType(SubscriptionType.Exclusive)
                     .subscriptionName("test-sub")
@@ -665,13 +681,19 @@ private void testExclamationFunction(Runtime runtime, boolean isTopicPattern,
bo
 
         // submit the exclamation function
         submitExclamationFunction(
-            runtime, inputTopicName, outputTopicName, functionName, pyZip);
+            runtime, inputTopicName, outputTopicName, functionName, pyZip, withExtraDeps,
schema);
 
         // get function info
         getFunctionInfoSuccess(functionName);
 
         // publish and consume result
-        publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
+        if (Runtime.JAVA == runtime) {
+            // java supports schema
+            publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
+        } else {
+            // python doesn't support schema
+            publishAndConsumeMessagesBytes(inputTopicName, outputTopicName, numMessages);
+        }
 
         // get function status
         getFunctionStatus(functionName, numMessages);
@@ -687,15 +709,18 @@ private static void submitExclamationFunction(Runtime runtime,
                                                   String inputTopicName,
                                                   String outputTopicName,
                                                   String functionName,
-                                                  boolean pyZip) throws Exception {
+                                                  boolean pyZip,
+                                                  boolean withExtraDeps,
+                                                  Schema<?> schema) throws Exception
{
         submitFunction(
             runtime,
             inputTopicName,
             outputTopicName,
             functionName,
             pyZip,
-            getExclamationClass(runtime, pyZip),
-            Schema.STRING);
+            withExtraDeps,
+            getExclamationClass(runtime, pyZip, withExtraDeps),
+            schema);
     }
 
     private static <T> void submitFunction(Runtime runtime,
@@ -703,6 +728,7 @@ private static void submitExclamationFunction(Runtime runtime,
                                            String outputTopicName,
                                            String functionName,
                                            boolean pyZip,
+                                           boolean withExtraDeps,
                                            String functionClass,
                                            Schema<T> inputTopicSchema) throws Exception
{
         CommandGenerator generator;
@@ -723,6 +749,8 @@ private static void submitExclamationFunction(Runtime runtime,
             generator.setRuntime(runtime);
             if (pyZip) {
                 command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHONZIP_FILE);
+            } else if (withExtraDeps) {
+                command = generator.generateCreateFunctionCommand(EXCLAMATION_WITH_DEPS_PYTHON_FILE);
             } else {
                 command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
             }
@@ -850,6 +878,56 @@ private static void publishAndConsumeMessages(String inputTopic,
         }
     }
 
+    private static void publishAndConsumeMessagesBytes(String inputTopic,
+                                                       String outputTopic,
+                                                       int numMessages) throws Exception
{
+        @Cleanup PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
+            .topic(outputTopic)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscriptionName("test-sub")
+            .subscribe();
+        if (inputTopic.endsWith(".*")) {
+            @Cleanup Producer<byte[]> producer1 = client.newProducer(Schema.BYTES)
+                    .topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
+                    .create();
+            @Cleanup Producer<byte[]> producer2 = client.newProducer(Schema.BYTES)
+                    .topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
+                    .create();
+
+            for (int i = 0; i < numMessages / 2; i++) {
+                producer1.send(("message-" + i).getBytes(UTF_8));
+            }
+
+            for (int i = numMessages / 2; i < numMessages; i++) {
+                producer2.send(("message-" + i).getBytes(UTF_8));
+            }
+        } else {
+            @Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                    .topic(inputTopic)
+                    .create();
+
+            for (int i = 0; i < numMessages; i++) {
+                producer.send(("message-" + i).getBytes(UTF_8));
+            }
+        }
+
+        Set<String> expectedMessages = new HashSet<>();
+        for (int i = 0; i < numMessages; i++) {
+            expectedMessages.add("message-" + i + "!");
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
+            String msgValue = new String(msg.getValue(), UTF_8);
+            log.info("Received: {}", msgValue);
+            assertTrue(expectedMessages.contains(msgValue));
+            expectedMessages.remove(msgValue);
+        }
+    }
+
     private static void deleteFunction(String functionName) throws Exception {
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
@@ -872,7 +950,7 @@ public void testAutoSchemaFunction() throws Exception {
 
         // submit the exclamation function
         submitFunction(
-            Runtime.JAVA, inputTopicName, outputTopicName, functionName, false,
+            Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, false,
             AutoSchemaFunction.class.getName(),
             Schema.AVRO(CustomObject.class));
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 9acdf1fdf3..851793c56d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -74,20 +74,28 @@ public void teardownFunctionWorkers() {
         "org.apache.pulsar.functions.api.examples.ExclamationFunction";
 
     public static final String EXCLAMATION_PYTHON_CLASS =
-        "exclamation.ExclamationFunction";
+        "exclamation_function.ExclamationFunction";
+
+    public static final String EXCLAMATION_WITH_DEPS_PYTHON_CLASS =
+        "exclamation_with_extra_deps.ExclamationFunction";
 
     public static final String EXCLAMATION_PYTHONZIP_CLASS =
             "exclamation";
 
     public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py";
+    public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = "exclamation_with_extra_deps.py";
     public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip";
 
-    protected static String getExclamationClass(Runtime runtime, boolean pyZip) {
+    protected static String getExclamationClass(Runtime runtime,
+                                                boolean pyZip,
+                                                boolean extraDeps) {
         if (Runtime.JAVA == runtime) {
             return EXCLAMATION_JAVA_CLASS;
         } else if (Runtime.PYTHON == runtime) {
             if (pyZip) {
                 return EXCLAMATION_PYTHONZIP_CLASS;
+            } else if (extraDeps) {
+                return EXCLAMATION_WITH_DEPS_PYTHON_CLASS;
             } else {
                 return EXCLAMATION_PYTHON_CLASS;
             }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message