pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] srkukarni closed pull request #2892: Remove runtime dependency from pulsar-admin
Date Thu, 01 Nov 2018 01:02:56 GMT
srkukarni closed pull request #2892: Remove runtime dependency from pulsar-admin
URL: https://github.com/apache/pulsar/pull/2892
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/function-localrunner b/bin/function-localrunner
new file mode 100755
index 0000000000..16362b4a0e
--- /dev/null
+++ b/bin/function-localrunner
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+BINDIR=$(dirname "$0")
+export PULSAR_HOME=`cd $BINDIR/..;pwd`
+. "$PULSAR_HOME/bin/pulsar-admin-common.sh"
+
+# functions related variables
+FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
+DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
+JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
+DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
+PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
+
+# find the java instance location
+if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
+    # didn't find a released jar, then search the built jar
+    BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
+    if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then
+        JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
+    else
+        echo "\nCouldn't find pulsar java instance jar.";
+        echo "Make sure you've run 'mvn package'\n";
+        exit 1;
+    fi
+fi
+
+# find the python instance location
+if [ ! -f "${PY_INSTANCE_FILE}" ]; then
+    # didn't find a released python instance, then search the built python instance
+    BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
+    if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then
+        PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
+    else
+        echo "\nCouldn't find pulsar python instance.";
+        echo "Make sure you've run 'mvn package'\n";
+        exit 1;
+    fi
+fi
+
+# functions
+OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
+OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
+
+MAINCLASS="org.apache.pulsar.functions.runtime.LocalRunner"
+
+#Change to PULSAR_HOME to support relative paths
+cd "$PULSAR_HOME"
+exec $JAVA $OPTS $MAINCLASS "$@"
diff --git a/bin/pulsar-admin b/bin/pulsar-admin
index 39d12393b2..97206e61f4 100755
--- a/bin/pulsar-admin
+++ b/bin/pulsar-admin
@@ -20,129 +20,7 @@
 
 BINDIR=$(dirname "$0")
 export PULSAR_HOME=`cd $BINDIR/..;pwd`
-
-DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf
-DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
-
-# functions related variables
-FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
-DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
-JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
-DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
-PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
-
-if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ]
-then
-    . "$PULSAR_HOME/conf/pulsar_tools_env.sh"
-fi
-
-# Check for the java to use
-if [[ -z $JAVA_HOME ]]; then
-    JAVA=$(which java)
-    if [ $? != 0 ]; then
-        echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
-        exit 1
-    fi
-else
-    JAVA=$JAVA_HOME/bin/java
-fi
-
-# exclude tests jar
-RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
-if [ $? == 0 ]; then
-    PULSAR_JAR=$RELEASE_JAR
-fi
-
-# exclude tests jar
-BUILT_JAR=`ls $PULSAR_HOME/pulsar-client-tools/target/pulsar-*.jar 2> /dev/null | grep
-v tests | tail -1`
-if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
-    echo "\nCouldn't find pulsar jar.";
-    echo "Make sure you've run 'mvn package'\n";
-    exit 1;
-elif [ -e "$BUILT_JAR" ]; then
-    PULSAR_JAR=$BUILT_JAR
-fi
-
-add_maven_deps_to_classpath() {
-    MVN="mvn"
-    if [ "$MAVEN_HOME" != "" ]; then
-	MVN=${MAVEN_HOME}/bin/mvn
-    fi
-
-    # Need to generate classpath from maven pom. This is costly so generate it
-    # and cache it. Save the file into our target dir so a mvn clean will get
-    # clean it up and force us create a new one.
-    f="${PULSAR_HOME}/distribution/server/target/classpath.txt"
-    if [ ! -f "${f}" ]
-    then
-	${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}"
&> /dev/null
-    fi
-    PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
-}
-
-if [ -d "$PULSAR_HOME/lib" ]; then
-    PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*"
-else
-    add_maven_deps_to_classpath
-fi
-
-if [ -z "$PULSAR_CLIENT_CONF" ]; then
-    PULSAR_CLIENT_CONF=$DEFAULT_CLIENT_CONF
-fi
-if [ -z "$PULSAR_LOG_CONF" ]; then
-    PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
-fi
-
-PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
-PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
-OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"
-OPTS="$OPTS -Djava.net.preferIPv4Stack=true"
-
-OPTS="-cp $PULSAR_CLASSPATH $OPTS"
-
-OPTS="$OPTS $PULSAR_EXTRA_OPTS"
-
-# log directory & file
-PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
-PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
-PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
-PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}
-
-#Configure log configuration system properties
-OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
-OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
-OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
-OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"
-
-# find the java instance location
-if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
-    # didn't find a released jar, then search the built jar
-    BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
-    if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then
-        JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
-    else
-        echo "\nCouldn't find pulsar java instance jar.";
-        echo "Make sure you've run 'mvn package'\n";
-        exit 1;
-    fi
-fi
-
-# find the python instance location
-if [ ! -f "${PY_INSTANCE_FILE}" ]; then
-    # didn't find a released python instance, then search the built python instance
-    BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
-    if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then
-        PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
-    else
-        echo "\nCouldn't find pulsar python instance.";
-        echo "Make sure you've run 'mvn package'\n";
-        exit 1;
-    fi
-fi
-
-# functions
-OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
-OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
+. "$PULSAR_HOME/bin/pulsar-admin-common.sh"
 
 #Change to PULSAR_HOME to support relative paths
 cd "$PULSAR_HOME"
diff --git a/bin/pulsar-admin-common.sh b/bin/pulsar-admin-common.sh
new file mode 100755
index 0000000000..0ccfc70985
--- /dev/null
+++ b/bin/pulsar-admin-common.sh
@@ -0,0 +1,105 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf
+DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
+
+if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ]
+then
+    . "$PULSAR_HOME/conf/pulsar_tools_env.sh"
+fi
+
+# Check for the java to use
+if [[ -z $JAVA_HOME ]]; then
+    JAVA=$(which java)
+    if [ $? != 0 ]; then
+        echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
+        exit 1
+    fi
+else
+    JAVA=$JAVA_HOME/bin/java
+fi
+
+# exclude tests jar
+RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
+if [ $? == 0 ]; then
+    PULSAR_JAR=$RELEASE_JAR
+fi
+
+# exclude tests jar
+BUILT_JAR=`ls $PULSAR_HOME/pulsar-client-tools/target/pulsar-*.jar 2> /dev/null | grep
-v tests | tail -1`
+if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
+    echo "\nCouldn't find pulsar jar.";
+    echo "Make sure you've run 'mvn package'\n";
+    exit 1;
+elif [ -e "$BUILT_JAR" ]; then
+    PULSAR_JAR=$BUILT_JAR
+fi
+
+add_maven_deps_to_classpath() {
+    MVN="mvn"
+    if [ "$MAVEN_HOME" != "" ]; then
+	MVN=${MAVEN_HOME}/bin/mvn
+    fi
+
+    # Need to generate classpath from maven pom. This is costly so generate it
+    # and cache it. Save the file into our target dir so a mvn clean will get
+    # clean it up and force us create a new one.
+    f="${PULSAR_HOME}/distribution/server/target/classpath.txt"
+    if [ ! -f "${f}" ]
+    then
+	${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}"
&> /dev/null
+    fi
+    PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
+}
+
+if [ -d "$PULSAR_HOME/lib" ]; then
+    PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*"
+else
+    add_maven_deps_to_classpath
+fi
+
+if [ -z "$PULSAR_CLIENT_CONF" ]; then
+    PULSAR_CLIENT_CONF=$DEFAULT_CLIENT_CONF
+fi
+if [ -z "$PULSAR_LOG_CONF" ]; then
+    PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
+fi
+
+PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
+PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
+OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"
+OPTS="$OPTS -Djava.net.preferIPv4Stack=true"
+
+OPTS="-cp $PULSAR_CLASSPATH $OPTS"
+
+OPTS="$OPTS $PULSAR_EXTRA_OPTS"
+
+# log directory & file
+PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
+PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
+PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
+PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}
+
+#Configure log configuration system properties
+OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
+OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
+OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
+OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 3169e2224b..8f30f2fba9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -42,17 +42,13 @@
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.lang.reflect.Type;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -69,19 +65,13 @@
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.WindowConfig;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.*;
+import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.windowing.WindowUtils;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style
compute processes that work with Pulsar)")
 public class CmdFunctions extends CmdBase {
-    private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
-
     private final LocalRunner localRunner;
     private final CreateFunction creater;
     private final DeleteFunction deleter;
@@ -641,14 +631,22 @@ private void mergeArgs() {
         void runCmd() throws Exception {
             // merge deprecated args with new args
             mergeArgs();
-            CmdFunctions.startLocalRun(FunctionConfigUtils.convert(functionConfig, classLoader),
functionConfig.getParallelism(),
-                    instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
-                    AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
-                            .clientAuthenticationParameters(clientAuthParams).useTls(useTls)
-                            .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
-                            .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
-                            .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                    userCodeFile, admin);
+            List<String> localRunArgs = new LinkedList<>();
+            localRunArgs.add(System.getenv("PULSAR_HOME") + "/bin/function-localrunner");
+            localRunArgs.add("--functionConfig");
+            localRunArgs.add(new Gson().toJson(functionConfig));
+            for (Field field : this.getClass().getDeclaredFields()) {
+                if (field.getName().startsWith("DEPRECATED")) continue;
+                if(field.getName().startsWith("this$0")) continue;
+                Object value = field.get(this);
+                if (value != null) {
+                    localRunArgs.add("--" + field.getName());
+                    localRunArgs.add(value.toString());
+                }
+            }
+            ProcessBuilder processBuilder = new ProcessBuilder(localRunArgs).inheritIO();
+            Process process = processBuilder.start();
+            process.waitFor();
         }
     }
 
@@ -1029,80 +1027,4 @@ private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig
functio
         }
     }
 
-    protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails
functionDetails,
-            int parallelism, int instanceIdOffset, String brokerServiceUrl, String stateStorageServiceUrl,
AuthenticationConfig authConfig,
-            String userCodeFile, PulsarAdmin admin)
-            throws Exception {
-
-        String serviceUrl = admin.getServiceUrl();
-        if (brokerServiceUrl != null) {
-            serviceUrl = brokerServiceUrl;
-        }
-        if (serviceUrl == null) {
-            serviceUrl = DEFAULT_SERVICE_URL;
-        }
-
-        try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl,
stateStorageServiceUrl, authConfig, null, null,
-                null, new DefaultSecretsProviderConfigurator())) {
-            List<RuntimeSpawner> spawners = new LinkedList<>();
-            for (int i = 0; i < parallelism; ++i) {
-                InstanceConfig instanceConfig = new InstanceConfig();
-                instanceConfig.setFunctionDetails(functionDetails);
-                // TODO: correctly implement function version and id
-                instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
-                instanceConfig.setFunctionId(UUID.randomUUID().toString());
-                instanceConfig.setInstanceId(i + instanceIdOffset);
-                instanceConfig.setMaxBufferedTuples(1024);
-                instanceConfig.setPort(Utils.findAvailablePort());
-                RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
-                        instanceConfig,
-                        userCodeFile,
-                        null,
-                        containerFactory,
-                        30000);
-                spawners.add(runtimeSpawner);
-                runtimeSpawner.start();
-            }
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                public void run() {
-                    log.info("Shutting down the localrun runtimeSpawner ...");
-                    for (RuntimeSpawner spawner : spawners) {
-                        spawner.close();
-                    }
-                }
-            });
-            Timer statusCheckTimer = new Timer();
-            statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
-                    @Override
-                    public void run() {
-                        CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
-                        int index = 0;
-                        for (RuntimeSpawner spawner : spawners) {
-                            futures[index] = spawner.getFunctionStatusAsJson(index);
-                            index++;
-                        }
-                        try {
-                            CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
-                            for (index = 0; index < futures.length; ++index) {
-                                String json = futures[index].get();
-                                Gson gson = new GsonBuilder().setPrettyPrinting().create();
-                                log.info(gson.toJson(new JsonParser().parse(json)));
-                            }
-                        } catch (Exception ex) {
-                            log.error("Could not get status from all local instances");
-                        }
-                    }
-                }, 30000, 30000);
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                    public void run() {
-                        statusCheckTimer.cancel();
-                    }
-                });
-            for (RuntimeSpawner spawner : spawners) {
-                spawner.join();
-                log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException());
-            }
-
-        }
-    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index a3bfd88025..cd57b9a54e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -35,6 +35,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.lang.reflect.Type;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -55,7 +56,6 @@
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
@@ -165,18 +165,24 @@ private void mergeArgs() {
         }
 
         @Override
-        void runCmd() throws Exception {
+        public void runCmd() throws Exception {
             // merge deprecated args with new args
             mergeArgs();
-
-            CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), sinkConfig.getParallelism(),
-                    0, brokerServiceUrl, null,
-                    AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
-                            .clientAuthenticationParameters(clientAuthParams).useTls(useTls)
-                            .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
-                            .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
-                            .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                    sinkConfig.getArchive(), admin);
+            List<String> localRunArgs = new LinkedList<>();
+            localRunArgs.add(System.getenv("PULSAR_HOME") + "/bin/function-localrunner");
+            localRunArgs.add("--sinkConfig");
+            localRunArgs.add(new Gson().toJson(sinkConfig));
+            for (Field field : this.getClass().getDeclaredFields()) {
+                if (field.getName().startsWith("DEPRECATED")) continue;
+                Object value = field.get(this);
+                if (value != null) {
+                    localRunArgs.add("--" + field.getName());
+                    localRunArgs.add(value.toString());
+                }
+            }
+            ProcessBuilder processBuilder = new ProcessBuilder(localRunArgs).inheritIO();
+            Process process = processBuilder.start();
+            process.waitFor();
         }
 
         @Override
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index baeb5e0b09..ff4625246a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -35,9 +35,11 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.lang.reflect.Type;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -55,7 +57,6 @@
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
@@ -168,18 +169,25 @@ private void mergeArgs() {
         }
 
         @Override
-        void runCmd() throws Exception {
+        public void runCmd() throws Exception {
             // merge deprecated args with new args
             mergeArgs();
 
-            CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), sourceConfig.getParallelism(),
-                    0, brokerServiceUrl, null,
-                    AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
-                            .clientAuthenticationParameters(clientAuthParams).useTls(useTls)
-                            .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
-                            .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
-                            .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                    sourceConfig.getArchive(), admin);
+            List<String> localRunArgs = new LinkedList<>();
+            localRunArgs.add(System.getenv("PULSAR_HOME") + "/bin/function-localrunner");
+            localRunArgs.add("--sourceConfig");
+            localRunArgs.add(new Gson().toJson(sourceConfig));
+            for (Field field : this.getClass().getDeclaredFields()) {
+                if (field.getName().startsWith("DEPRECATED")) continue;
+                Object value = field.get(this);
+                if (value != null) {
+                    localRunArgs.add("--" + field.getName());
+                    localRunArgs.add(value.toString());
+                }
+            }
+            ProcessBuilder processBuilder = new ProcessBuilder(localRunArgs).inheritIO();
+            Process process = processBuilder.start();
+            process.waitFor();
         }
 
         @Override
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 6b59023d94..aea19fd0bb 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -117,7 +117,7 @@ public void setup() throws Exception {
         deleteSink = spy(cmdSinks.getDeleteSink());
 
         mockStatic(CmdFunctions.class);
-        PowerMockito.doNothing().when(CmdFunctions.class, "startLocalRun", Mockito.any(),
Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
+        PowerMockito.doNothing().when(localSinkRunner).runCmd();
         URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
         if (file == null)  {
             throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index cb0c9e98ea..32a6c60186 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -101,7 +101,7 @@ public void setup() throws Exception {
         deleteSource = spy(CmdSources.getDeleteSource());
 
         mockStatic(CmdFunctions.class);
-        PowerMockito.doNothing().when(CmdFunctions.class, "startLocalRun", Mockito.any(),
Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
+        PowerMockito.doNothing().when(localSourceRunner).runCmd();
         JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
         WRONG_JAR_PATH = Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile();
         Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH)));
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
new file mode 100644
index 0000000000..ed52a96a0d
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.runtime;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
+
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.*;
+
+import static org.apache.pulsar.functions.utils.Utils.*;
+
+@Slf4j
+public class LocalRunner {
+
+    @Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig",
hidden = true)
+    protected String functionConfigString;
+    @Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig",
hidden = true)
+    protected String sourceConfigString;
+    @Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig",
hidden = true)
+    protected String sinkConfigString;
+    @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage
service (by default Apache BookKeeper)", hidden = true)
+    protected String stateStorageServiceUrl;
+    @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker",
hidden = true)
+    protected String brokerServiceUrl;
+    @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin
using which function-process can connect to broker", hidden = true)
+    protected String clientAuthPlugin;
+    @Parameter(names = "--clientAuthParams", description = "Client authentication param",
hidden = true)
+    protected String clientAuthParams;
+    @Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity
= 1)
+    protected boolean useTls;
+    @Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls
connection\n", hidden = true, arity = 1)
+    protected boolean tlsAllowInsecureConnection;
+    @Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname
verification", hidden = true, arity = 1)
+    protected boolean tlsHostNameVerificationEnabled;
+    @Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path",
hidden = true)
+    protected String tlsTrustCertFilePath;
+    @Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this
offset", hidden = true)
+    protected Integer instanceIdOffset = 0;
+    private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
+
+    public static void main(String[] args) throws Exception {
+        LocalRunner localRunner = new LocalRunner();
+        JCommander jcommander = new JCommander(localRunner);
+        jcommander.setProgramName("LocalRunner");
+
+        // parse args by JCommander
+        jcommander.parse(args);
+        localRunner.start();
+    }
+
+    void start() throws Exception {
+        Function.FunctionDetails functionDetails;
+        String userCodeFile;
+        int parallelism;
+        if (!StringUtils.isEmpty(functionConfigString)) {
+            FunctionConfig functionConfig = new Gson().fromJson(functionConfigString, FunctionConfig.class);
+            ClassLoader classLoader = null;
+            parallelism = functionConfig.getParallelism();
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                userCodeFile = functionConfig.getJar();
+                if (isFunctionPackageUrlSupported(userCodeFile)) {
+                    classLoader = extractClassLoader(userCodeFile);
+                } else {
+                    File file = new File(userCodeFile);
+                    if (!file.exists()) {
+                        throw new RuntimeException("User jar does not exist");
+                    }
+                    classLoader = loadJar(file);
+                }
+            } else {
+                userCodeFile = functionConfig.getPy();
+            }
+            functionDetails = FunctionConfigUtils.convert(functionConfig, classLoader);
+        } else if (!StringUtils.isEmpty(sourceConfigString)) {
+            SourceConfig sourceConfig = new Gson().fromJson(sourceConfigString, SourceConfig.class);
+            NarClassLoader classLoader;
+            parallelism = sourceConfig.getParallelism();
+            userCodeFile = sourceConfig.getArchive();
+            if (isFunctionPackageUrlSupported(userCodeFile)) {
+                classLoader = extractNarClassLoader(null, userCodeFile, null);
+            } else {
+                File file = new File(userCodeFile);
+                if (!file.exists()) {
+                    throw new RuntimeException("Source archive does not exist");
+                }
+                classLoader = extractNarClassLoader(null, null, file);
+            }
+            functionDetails = SourceConfigUtils.convert(sourceConfig, classLoader);
+        } else {
+            SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, SinkConfig.class);
+            NarClassLoader classLoader;
+            parallelism = sinkConfig.getParallelism();
+            userCodeFile = sinkConfig.getArchive();
+            if (isFunctionPackageUrlSupported(userCodeFile)) {
+                classLoader = extractNarClassLoader(null, userCodeFile, null);
+            } else {
+                File file = new File(userCodeFile);
+                if (!file.exists()) {
+                    throw new RuntimeException("Sink archive does not exist");
+                }
+                classLoader = extractNarClassLoader(null, null, file);
+            }
+            functionDetails = SinkConfigUtils.convert(sinkConfig, classLoader);
+        }
+        startLocalRun(functionDetails, parallelism,
+                instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
+                AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
+                        .clientAuthenticationParameters(clientAuthParams).useTls(useTls)
+                        .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
+                        .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
+                        .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
+                userCodeFile);
+    }
+
+    protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails
functionDetails,
+                                        int parallelism, int instanceIdOffset, String brokerServiceUrl,
String stateStorageServiceUrl, AuthenticationConfig authConfig,
+                                        String userCodeFile)
+            throws Exception {
+
+        String serviceUrl = DEFAULT_SERVICE_URL;
+        if (brokerServiceUrl != null) {
+            serviceUrl = brokerServiceUrl;
+        }
+
+        try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl,
stateStorageServiceUrl, authConfig, null, null,
+                null, new DefaultSecretsProviderConfigurator())) {
+            List<RuntimeSpawner> spawners = new LinkedList<>();
+            for (int i = 0; i < parallelism; ++i) {
+                InstanceConfig instanceConfig = new InstanceConfig();
+                instanceConfig.setFunctionDetails(functionDetails);
+                // TODO: correctly implement function version and id
+                instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
+                instanceConfig.setFunctionId(UUID.randomUUID().toString());
+                instanceConfig.setInstanceId(i + instanceIdOffset);
+                instanceConfig.setMaxBufferedTuples(1024);
+                instanceConfig.setPort(Utils.findAvailablePort());
+                RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
+                        instanceConfig,
+                        userCodeFile,
+                        null,
+                        containerFactory,
+                        30000);
+                spawners.add(runtimeSpawner);
+                runtimeSpawner.start();
+            }
+            java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    log.info("Shutting down the localrun runtimeSpawner ...");
+                    for (RuntimeSpawner spawner : spawners) {
+                        spawner.close();
+                    }
+                }
+            });
+            Timer statusCheckTimer = new Timer();
+            statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
+                @Override
+                public void run() {
+                    CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
+                    int index = 0;
+                    for (RuntimeSpawner spawner : spawners) {
+                        futures[index] = spawner.getFunctionStatusAsJson(index);
+                        index++;
+                    }
+                    try {
+                        CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
+                        for (index = 0; index < futures.length; ++index) {
+                            String json = futures[index].get();
+                            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+                            log.info(gson.toJson(new JsonParser().parse(json)));
+                        }
+                    } catch (Exception ex) {
+                        log.error("Could not get status from all local instances");
+                    }
+                }
+            }, 30000, 30000);
+            java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    statusCheckTimer.cancel();
+                }
+            });
+            for (RuntimeSpawner spawner : spawners) {
+                spawner.join();
+                log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException());
+            }
+
+        }
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message