pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sanjee...@apache.org
Subject [pulsar] branch master updated: Enable Pulsar Functions to be deployed on a kubernetes cluster (#1950)
Date Mon, 01 Oct 2018 18:49:01 GMT
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d085806  Enable Pulsar Functions to be deployed on a kubernetes cluster (#1950)
d085806 is described below

commit d0858065b586344cb7b1b8a5a32e694bc6c004f4
Author: Sanjeev Kulkarni <sanjeevrk@gmail.com>
AuthorDate: Mon Oct 1 11:48:53 2018 -0700

    Enable Pulsar Functions to be deployed on a kubernetes cluster (#1950)
    
    * Support submitting pulsar functions to kubernetes
    
    * Adjusted pom
    
    * Added helper function
    
    * Added port to instance config
    
    * Made things public
    
    * More fixes
    
    * Made changes to kubernetes controller
    
    * refactored jobname
    
    * Removed resource
    
    * Bumped to 2.1.0
    
    * Fix compilation bug
    
    * Fix compile bugs
    
    * Fixed compile
    
    * Compile fix
    
    * Some remant
    
    * Unnecessary imports
    
    * Added dep
    
    * Default values
    
    * Fix
    
    * Use kubectl proxy stuff
    
    * Corrected the path of pulsar-amin
    
    * Fixed bugs
    
    * No longer required
    
    * Download first
    
    * make shard id work
    
    * Removed using runtime
    
    * Fixed npe
    
    * Corrected command name
    
    * Fixed bugs
    
    * Removed commented sections
    
    * Uncommented some stuff
    
    * Address Jerry's comments
    
    * Change to 2.2
    
    * Use non camel case style for passing arguments
    
    * removed cmdline changes
    
    * Second set of changes
    
    * Next set of canges
    
    * no such module
    
    * Code complete
    
    * Fixed logic wrt packages
    
    * Do not stop externally managed functions upon exit
    
    * Modified to use AppV1 instead of beta
    
    * use lower case
    
    * Unpretty the function details
    
    * If the job exists, don't flag as error
    
    * Give ability to submit from inside a pod
    
    * remove encode/decode
    
    * Escape function details
    
    * Working copy with getstatus and stop. Restart needs some work
    
    * removed unused function
    
    * Fixed bugs
    
    * Fixed unittest
    
    * Fixed unittest
    
    * Fixed unittest
    
    * Fixed tests
    
    * Reverted conf changes
    
    * Minor nit
    
    * Exlcude logback
    
    * Updated comments
    
    * Changed jobname to include namespace/tenant
    
    * Fix unittest
    
    * Updated licenses
    
    * Use 2.0.0 instead of beta1
---
 conf/functions_worker.yml                          |   2 +
 distribution/server/src/assemble/LICENSE.bin.txt   |  10 +
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   4 +-
 pulsar-functions/runtime/pom.xml                   |  13 +
 .../pulsar/functions/runtime/JavaInstanceMain.java |   3 +-
 .../functions/runtime/KubernetesRuntime.java       | 551 +++++++++++++++++++++
 .../runtime/KubernetesRuntimeFactory.java          | 155 ++++++
 .../pulsar/functions/runtime/ProcessRuntime.java   | 111 +----
 .../functions/runtime/ProcessRuntimeFactory.java   |   1 +
 .../apache/pulsar/functions/runtime/Runtime.java   |   6 +-
 .../pulsar/functions/runtime/RuntimeFactory.java   |   4 +-
 .../pulsar/functions/runtime/RuntimeSpawner.java   |  37 +-
 .../pulsar/functions/runtime/RuntimeUtils.java     | 135 +++++
 .../pulsar/functions/runtime/ThreadRuntime.java    |   2 +-
 .../functions/runtime/ThreadRuntimeFactory.java    |   3 +-
 ...RuntimeTest.java => KubernetesRuntimeTest.java} |  66 +--
 .../functions/runtime/ProcessRuntimeTest.java      |  12 +-
 .../pulsar/functions/worker/FunctionActioner.java  |  16 +-
 .../functions/worker/FunctionRuntimeManager.java   | 125 +++--
 .../pulsar/functions/worker/MembershipManager.java |   2 +-
 .../pulsar/functions/worker/SchedulerManager.java  |  23 +-
 .../pulsar/functions/worker/WorkerConfig.java      |  16 +
 .../functions/worker/rest/api/FunctionsImpl.java   |  12 +-
 .../functions/worker/FunctionActionerTest.java     |   2 +-
 .../functions/worker/SchedulerManagerTest.java     |  22 +
 25 files changed, 1122 insertions(+), 211 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 0c7b8af..ba892ad 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -36,6 +36,8 @@ downloadDirectory: /tmp/pulsar_functions
 #  threadGroupName: "Thread Function Container Group"
 processContainerFactory:
   logDirectory:
+#kubernetesContainerFactory:
+#  k8Uri:
 
 schedulerClassName: "org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler"
 functionAssignmentTopicName: "assignments"
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 5f0f07b..2ce32cb 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -450,6 +450,15 @@ The Apache Software License, Version 2.0
     - org.xerial.snappy-snappy-java-1.1.1.3.jar
   * Objenesis
     - org.objenesis-objenesis-2.1.jar
+  * Squareup
+    - com.squareup.okhttp-logging-interceptor-2.7.5.jar
+    - com.squareup.okhttp-okhttp-ws-2.7.5.jar
+  * Kubernetes Client
+    - io.kubernetes-client-java-2.0.0.jar
+    - io.kubernetes-client-java-api-2.0.0.jar
+    - io.kubernetes-client-java-proto-2.0.0.jar
+  * Joda Time
+    - joda-time-joda-time-2.9.3.jar
 
 
 BSD 3-clause "New" or "Revised" License
@@ -523,6 +532,7 @@ Bouncy Castle License
  * Bouncy Castle -- licenses/LICENSE-bouncycastle.txt
     - org.bouncycastle-bcpkix-jdk15on-1.55.jar
     - org.bouncycastle-bcprov-jdk15on-1.55.jar
+    - org.bouncycastle-bcprov-ext-jdk15on-1.59.jar
 
 ------------------------
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 395e881..9e3c6e8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1264,6 +1264,7 @@ public class CmdFunctions extends CmdBase {
                 RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                         instanceConfig,
                         userCodeFile,
+                        null,
                         containerFactory,
                         30000);
                 spawners.add(runtimeSpawner);
@@ -1284,7 +1285,8 @@ public class CmdFunctions extends CmdBase {
                         CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
                         int index = 0;
                         for (RuntimeSpawner spawner : spawners) {
-                            futures[index++] = spawner.getFunctionStatusAsJson();
+                            futures[index] = spawner.getFunctionStatusAsJson(index);
+                            index++;
                         }
                         try {
                             CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index 8c4be07..728df0e 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -55,6 +55,19 @@
       <artifactId>jcommander</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.kubernetes</groupId>
+      <artifactId>client-java</artifactId>
+      <version>2.0.0</version>
+      <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-classic</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index c18eff5..affa119 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -126,6 +126,7 @@ public class JavaInstanceMain implements AutoCloseable {
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
+                null, // we really dont use this in thread container
                 containerFactory,
                 expectedHealthCheckInterval * 1000);
 
@@ -218,7 +219,7 @@ public class JavaInstanceMain implements AutoCloseable {
         @Override
         public void getFunctionStatus(Empty request, StreamObserver<InstanceCommunication.FunctionStatus> responseObserver) {
             try {
-                InstanceCommunication.FunctionStatus response = runtimeSpawner.getFunctionStatus().get();
+                InstanceCommunication.FunctionStatus response = runtimeSpawner.getFunctionStatus(runtimeSpawner.getInstanceConfig().getInstanceId()).get();
                 responseObserver.onNext(response);
                 responseObserver.onCompleted();
             } catch (Exception e) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
new file mode 100644
index 0000000..0573d88
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.runtime;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.Empty;
+import com.squareup.okhttp.Response;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.kubernetes.client.apis.AppsV1Api;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.models.*;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.net.HttpURLConnection.HTTP_CONFLICT;
+
+/**
+ * Kubernetes based runtime for running functions.
+ * This runtime provides the usual methods to start/stop/getfunctionstatus
+ * interfaces to control the kubernetes job running function.
+ * We first create a headless service and then a statefulset for starting function pods
+ * Each function instance runs as a pod itself. The reason using statefulset as opposed
+ * to a regular deployment is that functions require a unique instance_id for each instance.
+ * The service abstraction is used for getting functionstatus.
+ */
+@Slf4j
+class KubernetesRuntime implements Runtime {
+
+    private static final String ENV_SHARD_ID = "SHARD_ID";
+    private static final int maxJobNameSize = 63;
+    private static final Integer GRPC_PORT = 9093;
+    public static final Pattern VALID_POD_NAME_REGEX =
+            Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
+                    Pattern.CASE_INSENSITIVE);
+
+    private final AppsV1Api appsClient;
+    private final CoreV1Api coreClient;
+    static final List<String> TOLERATIONS = Collections.unmodifiableList(
+            Arrays.asList(
+                    "node.kubernetes.io/not-ready",
+                    "node.alpha.kubernetes.io/notReady",
+                    "node.alpha.kubernetes.io/unreachable"
+            )
+    );
+
+    // The thread that invokes the function
+    @Getter
+    private List<String> processArgs;
+    @Getter
+    private ManagedChannel[] channel;
+    private InstanceControlGrpc.InstanceControlFutureStub[] stub;
+    private InstanceConfig instanceConfig;
+    private final String jobNamespace;
+    private final String pulsarDockerImageName;
+    private final String pulsarRootDir;
+    private final String userCodePkgUrl;
+    private final String originalCodeFileName;
+    private final String pulsarAdminUrl;
+    private boolean running;
+
+
+    KubernetesRuntime(AppsV1Api appsClient,
+                      CoreV1Api coreClient,
+                      String jobNamespace,
+                      String pulsarDockerImageName,
+                      String pulsarRootDir,
+                      InstanceConfig instanceConfig,
+                      String instanceFile,
+                      String logDirectory,
+                      String userCodePkgUrl,
+                      String originalCodeFileName,
+                      String pulsarServiceUrl,
+                      String pulsarAdminUrl,
+                      String stateStorageServiceUrl,
+                      AuthenticationConfig authConfig) throws Exception {
+        this.appsClient = appsClient;
+        this.coreClient = coreClient;
+        this.instanceConfig = instanceConfig;
+        this.jobNamespace = jobNamespace;
+        this.pulsarDockerImageName = pulsarDockerImageName;
+        this.pulsarRootDir = pulsarRootDir;
+        this.userCodePkgUrl = userCodePkgUrl;
+        this.originalCodeFileName = originalCodeFileName;
+        this.pulsarAdminUrl = pulsarAdminUrl;
+        this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
+                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "conf/log4j2.yaml");
+        running = false;
+        doChecks(instanceConfig.getFunctionDetails());
+    }
+
+    /**
+     * The core logic that creates a service first followed by statefulset
+     */
+    @Override
+    public void start() throws Exception {
+        submitService();
+        try {
+            submitStatefulSet();
+        } catch (Exception e) {
+            deleteService();
+        }
+        running = true;
+        if (channel == null && stub == null) {
+            channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
+            stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];
+            for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) {
+                String address = createJobName(instanceConfig.getFunctionDetails()) + "-" +
+                        i + "." + createJobName(instanceConfig.getFunctionDetails());
+                channel[i] = ManagedChannelBuilder.forAddress(address, GRPC_PORT)
+                        .usePlaintext(true)
+                        .build();
+                stub[i] = InstanceControlGrpc.newFutureStub(channel[i]);
+            }
+        }
+    }
+
+    @Override
+    public void join() throws Exception {
+        // K8 functions never return
+        this.wait();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (running) {
+            deleteStatefulSet();
+            deleteService();
+        }
+        if (channel != null) {
+            for (ManagedChannel cn : channel) {
+                cn.shutdown();
+            }
+        }
+        channel = null;
+        stub = null;
+        running = false;
+    }
+
+    @Override
+    public Throwable getDeathException() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<FunctionStatus> getFunctionStatus(int instanceId) {
+        CompletableFuture<FunctionStatus> retval = new CompletableFuture<>();
+        if (instanceId < 0 || instanceId >= stub.length) {
+            if (stub == null) {
+                retval.completeExceptionally(new RuntimeException("Invalid InstanceId"));
+                return retval;
+            }
+        }
+        if (stub == null) {
+            retval.completeExceptionally(new RuntimeException("Not alive"));
+            return retval;
+        }
+        ListenableFuture<FunctionStatus> response = stub[instanceId].getFunctionStatus(Empty.newBuilder().build());
+        Futures.addCallback(response, new FutureCallback<FunctionStatus>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+                FunctionStatus.Builder builder = FunctionStatus.newBuilder();
+                builder.setRunning(false);
+                builder.setFailureException(throwable.getMessage());
+                retval.complete(builder.build());
+            }
+
+            @Override
+            public void onSuccess(FunctionStatus t) {
+                retval.complete(t);
+            }
+        });
+        return retval;
+    }
+
+    @Override
+    public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
+        CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
+        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support getAndReset metrics via rest"));
+        return retval;
+    }
+
+    @Override
+    public CompletableFuture<Void> resetMetrics() {
+        CompletableFuture<Void> retval = new CompletableFuture<>();
+        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support resetting metrics via rest"));
+        return retval;
+    }
+
+    @Override
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+        CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
+        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support getting metrics via rest"));
+        return retval;
+    }
+
+    @Override
+    public boolean isAlive() {
+        return running;
+    }
+
+    private void submitService() throws Exception {
+        final V1Service service = createService();
+        log.info("Submitting the following service to k8 {}", coreClient.getApiClient().getJSON().serialize(service));
+
+        final Response response =
+                coreClient.createNamespacedServiceCall(jobNamespace, service, null,
+                        null, null).execute();
+        if (!response.isSuccessful()) {
+            if (response.code() == HTTP_CONFLICT) {
+                log.warn("Service already created for function {}/{}/{}",
+                        instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName());
+            } else {
+                log.error("Error creating Service for function {}/{}/{}:- {}",
+                        instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName(),
+                        response.message());
+                // construct a message based on the k8s api server response
+                throw new IllegalStateException(response.message());
+            }
+        } else {
+            log.info("Service Created Successfully for function {}/{}/{}",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName());
+        }
+    }
+
+    private V1Service createService() {
+        final String jobName = createJobName(instanceConfig.getFunctionDetails());
+
+        final V1Service service = new V1Service();
+
+        // setup stateful set metadata
+        final V1ObjectMeta objectMeta = new V1ObjectMeta();
+        objectMeta.name(jobName);
+        service.metadata(objectMeta);
+
+        // create the stateful set spec
+        final V1ServiceSpec serviceSpec = new V1ServiceSpec();
+
+        serviceSpec.clusterIP("None");
+
+        final V1ServicePort servicePort = new V1ServicePort();
+        servicePort.name("grpc").port(GRPC_PORT).protocol("TCP");
+        serviceSpec.addPortsItem(servicePort);
+
+        serviceSpec.selector(getLabels(instanceConfig.getFunctionDetails()));
+
+        service.spec(serviceSpec);
+
+        return service;
+    }
+
+    private void submitStatefulSet() throws Exception {
+        final V1StatefulSet statefulSet = createStatefulSet();
+
+        log.info("Submitting the following spec to k8 {}", appsClient.getApiClient().getJSON().serialize(statefulSet));
+
+        final Response response =
+                appsClient.createNamespacedStatefulSetCall(jobNamespace, statefulSet, null,
+                        null, null).execute();
+        if (!response.isSuccessful()) {
+            if (response.code() == HTTP_CONFLICT) {
+                log.warn("Statefulset already present for function {}/{}/{}",
+                        instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName());
+            } else {
+                log.error("Error creating statefulset for function {}/{}/{}:- {}",
+                        instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName(),
+                        response.message());
+                // construct a message based on the k8s api server response
+                throw new IllegalStateException(response.message());
+            }
+        } else {
+            log.info("Successfully created statefulset for function {}/{}/{}",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName());
+        }
+    }
+
+    public void deleteStatefulSet() throws Exception {
+        final V1DeleteOptions options = new V1DeleteOptions();
+        options.setGracePeriodSeconds(0L);
+        options.setPropagationPolicy("Foreground");
+        final Response response = appsClient.deleteNamespacedStatefulSetCall(
+                createJobName(instanceConfig.getFunctionDetails()),
+                jobNamespace, options, null, null, null, null, null, null)
+                .execute();
+
+        if (!response.isSuccessful()) {
+            throw new RuntimeException(String.format("Error deleting statefulset for function {}/{}/{} :- {} ",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName(),
+                    response.message()));
+        } else {
+            log.info("Successfully deleted statefulset for function {}/{}/{}",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName());
+        }
+    }
+
+    public void deleteService() throws Exception {
+        final V1DeleteOptions options = new V1DeleteOptions();
+        options.setGracePeriodSeconds(0L);
+        options.setPropagationPolicy("Foreground");
+        final Response response = coreClient.deleteNamespacedServiceCall(
+                createJobName(instanceConfig.getFunctionDetails()),
+                jobNamespace, options, null, null, null, null, null, null)
+                .execute();
+
+        if (!response.isSuccessful()) {
+            throw new RuntimeException(String.format("Error deleting service for function {}/{}/{} :- {}",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName(),
+                    response.message()));
+        } else {
+            log.info("Service deleted successfully for function {}/{}/{}",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName());
+        }
+    }
+
+    protected List<String> getExecutorCommand() {
+        return Arrays.asList(
+                "sh",
+                "-c",
+                String.join(" ", getDownloadCommand(userCodePkgUrl, originalCodeFileName))
+                        + " && " + setShardIdEnvironmentVariableCommand()
+                        + " && " + String.join(" ", processArgs)
+        );
+    }
+
+    private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) {
+        return Arrays.asList(
+                pulsarRootDir + "/bin/pulsar-admin",
+                "--admin-url",
+                pulsarAdminUrl,
+                "functions",
+                "download",
+                "--path",
+                bkPath,
+                "--destination-file",
+                userCodeFilePath);
+    }
+
+    private static String setShardIdEnvironmentVariableCommand() {
+        return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID);
+    }
+
+
+    private V1StatefulSet createStatefulSet() {
+        final String jobName = createJobName(instanceConfig.getFunctionDetails());
+
+        final V1StatefulSet statefulSet = new V1StatefulSet();
+
+        // setup stateful set metadata
+        final V1ObjectMeta objectMeta = new V1ObjectMeta();
+        objectMeta.name(jobName);
+        statefulSet.metadata(objectMeta);
+
+        // create the stateful set spec
+        final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec();
+        statefulSetSpec.serviceName(jobName);
+        statefulSetSpec.setReplicas(instanceConfig.getFunctionDetails().getParallelism());
+
+        // Parallel pod management tells the StatefulSet controller to launch or terminate
+        // all Pods in parallel, and not to wait for Pods to become Running and Ready or completely
+        // terminated prior to launching or terminating another Pod.
+        statefulSetSpec.setPodManagementPolicy("Parallel");
+
+        // add selector match labels
+        // so the we know which pods to manage
+        final V1LabelSelector selector = new V1LabelSelector();
+        selector.matchLabels(getLabels(instanceConfig.getFunctionDetails()));
+        statefulSetSpec.selector(selector);
+
+        // create a pod template
+        final V1PodTemplateSpec podTemplateSpec = new V1PodTemplateSpec();
+
+        // set up pod meta
+        final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(instanceConfig.getFunctionDetails()));
+        /*
+        TODO:- Figure out the metrics collection later.
+        templateMetaData.annotations(getPrometheusAnnotations());
+        */
+        podTemplateSpec.setMetadata(templateMetaData);
+
+        final List<String> command = getExecutorCommand();
+        podTemplateSpec.spec(getPodSpec(command, instanceConfig.getFunctionDetails().hasResources() ? instanceConfig.getFunctionDetails().getResources() : null));
+
+        statefulSetSpec.setTemplate(podTemplateSpec);
+
+        statefulSet.spec(statefulSetSpec);
+
+        return statefulSet;
+    }
+
+    private Map<String, String> getPrometheusAnnotations() {
+        final Map<String, String> annotations = new HashMap<>();
+        annotations.put("prometheus.io/scrape", "true");
+        annotations.put("prometheus.io/port", "8080");
+        return annotations;
+    }
+
+    private Map<String, String> getLabels(Function.FunctionDetails functionDetails) {
+        final Map<String, String> labels = new HashMap<>();
+        labels.put("app", createJobName(functionDetails));
+        labels.put("namespace", functionDetails.getNamespace());
+        labels.put("tenant", functionDetails.getTenant());
+        return labels;
+    }
+
+    private V1PodSpec getPodSpec(List<String> instanceCommand, Function.Resources resource) {
+        final V1PodSpec podSpec = new V1PodSpec();
+
+        // set the termination period to 0 so pods can be deleted quickly
+        podSpec.setTerminationGracePeriodSeconds(0L);
+
+        // set the pod tolerations so pods are rescheduled when nodes go down
+        // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions
+        podSpec.setTolerations(getTolerations());
+
+        podSpec.containers(Collections.singletonList(
+                getContainer(instanceCommand, resource)));
+
+        return podSpec;
+    }
+
+    private List<V1Toleration> getTolerations() {
+        final List<V1Toleration> tolerations = new ArrayList<>();
+        TOLERATIONS.forEach(t -> {
+            final V1Toleration toleration =
+                    new V1Toleration()
+                            .key(t)
+                            .operator("Exists")
+                            .effect("NoExecute")
+                            .tolerationSeconds(10L);
+            tolerations.add(toleration);
+        });
+
+        return tolerations;
+    }
+
+    private V1Container getContainer(List<String> instanceCommand, Function.Resources resource) {
+        final V1Container container = new V1Container().name("pulsarfunction");
+
+        // set up the container images
+        container.setImage(pulsarDockerImageName);
+
+        // set up the container command
+        container.setCommand(instanceCommand);
+
+        // setup the environment variables for the container
+        final V1EnvVar envVarPodName = new V1EnvVar();
+        envVarPodName.name("POD_NAME")
+                .valueFrom(new V1EnvVarSource()
+                        .fieldRef(new V1ObjectFieldSelector()
+                                .fieldPath("metadata.name")));
+        container.setEnv(Arrays.asList(envVarPodName));
+
+
+        // set container resources
+        final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
+        final Map<String, Quantity> requests = new HashMap<>();
+        requests.put("memory", Quantity.fromString(Long.toString(resource != null && resource.getRam() != 0 ? resource.getRam() : 1073741824)));
+        requests.put("cpu", Quantity.fromString(Double.toString(resource != null && resource.getCpu() != 0 ? resource.getCpu() : 1)));
+        resourceRequirements.setRequests(requests);
+        container.setResources(resourceRequirements);
+
+        // set container ports
+        container.setPorts(getContainerPorts());
+
+        return container;
+    }
+
+    private List<V1ContainerPort> getContainerPorts() {
+        List<V1ContainerPort> ports = new ArrayList<>();
+        final V1ContainerPort port = new V1ContainerPort();
+        port.setName("grpc");
+        port.setContainerPort(GRPC_PORT);
+        ports.add(port);
+        return ports;
+    }
+
+    private static String createJobName(Function.FunctionDetails functionDetails) {
+        return createJobName(functionDetails.getTenant(),
+                functionDetails.getNamespace(),
+                functionDetails.getName());
+    }
+
+    private static String createJobName(String tenant, String namespace, String functionName) {
+        return "pf-" + tenant + "-" + namespace + "-" + functionName;
+    }
+
+    private static void doChecks(Function.FunctionDetails functionDetails) {
+        final String jobName = createJobName(functionDetails);
+        if (!jobName.equals(jobName.toLowerCase())) {
+            throw new RuntimeException("Kubernetes does not allow upper case jobNames.");
+        }
+        final Matcher matcher = VALID_POD_NAME_REGEX.matcher(jobName);
+        if (!matcher.matches()) {
+            throw new RuntimeException("Kubernetes only admits lower case and numbers.");
+        }
+        if (jobName.length() > maxJobNameSize) {
+            throw new RuntimeException("Kubernetes job name size should be less than " + maxJobNameSize);
+        }
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
new file mode 100644
index 0000000..c55935b
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.runtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.kubernetes.client.ApiClient;
+import io.kubernetes.client.Configuration;
+import io.kubernetes.client.apis.AppsV1Api;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.util.Config;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+/**
+ * Kubernetes based function container factory implementation.
+ */
+@Slf4j
+public class KubernetesRuntimeFactory implements RuntimeFactory {
+
+    private final String k8Uri;
+    private final String jobNamespace;
+    private final String pulsarDockerImageName;
+    private final String pulsarRootDir;
+    private final Boolean submittingInsidePod;
+    private final String pulsarAdminUri;
+    private final String pulsarServiceUri;
+    private final String stateStorageServiceUri;
+    private final AuthenticationConfig authConfig;
+    private final String javaInstanceJarFile;
+    private final String pythonInstanceFile;
+    private final String logDirectory = "logs/functions";
+    private AppsV1Api appsClient;
+    private CoreV1Api coreClient;
+
+    @VisibleForTesting
+    public KubernetesRuntimeFactory(String k8Uri,
+                                    String jobNamespace,
+                                    String pulsarDockerImageName,
+                                    String pulsarRootDir,
+                                    Boolean submittingInsidePod,
+                                    String pulsarServiceUri,
+                                    String pulsarAdminUri,
+                                    String stateStorageServiceUri,
+                                    AuthenticationConfig authConfig) {
+        this.k8Uri = k8Uri;
+        if (!isEmpty(jobNamespace)) {
+            this.jobNamespace = jobNamespace;
+        } else {
+            this.jobNamespace = "default";
+        }
+        if (!isEmpty(pulsarDockerImageName)) {
+            this.pulsarDockerImageName = pulsarDockerImageName;
+        } else {
+            this.pulsarDockerImageName = "apachepulsar/pulsar";
+        }
+        if (!isEmpty(pulsarRootDir)) {
+            this.pulsarRootDir = pulsarRootDir;
+        } else {
+            this.pulsarRootDir = "/pulsar";
+        }
+        this.submittingInsidePod = submittingInsidePod;
+        this.pulsarServiceUri = pulsarServiceUri;
+        this.pulsarAdminUri = pulsarAdminUri;
+        this.stateStorageServiceUri = stateStorageServiceUri;
+        this.authConfig = authConfig;
+        this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar";
+        this.pythonInstanceFile = this.pulsarRootDir + "/instances/python-instance/python_instance_main.py";
+    }
+
+    @Override
+    public boolean externallyManaged() {
+        return true;
+    }
+
+    @Override
+    public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String codePkgUrl,
+                                             String originalCodeFileName,
+                                             Long expectedHealthCheckInterval) throws Exception {
+        setupClient();
+        String instanceFile;
+        switch (instanceConfig.getFunctionDetails().getRuntime()) {
+            case JAVA:
+                instanceFile = javaInstanceJarFile;
+                break;
+            case PYTHON:
+                instanceFile = pythonInstanceFile;
+                break;
+            default:
+                throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
+        }
+        return new KubernetesRuntime(
+            appsClient,
+            coreClient,
+            jobNamespace,
+            pulsarDockerImageName,
+            pulsarRootDir,
+            instanceConfig,
+            instanceFile,
+            logDirectory,
+            codePkgUrl,
+            originalCodeFileName,
+            pulsarServiceUri,
+            pulsarAdminUri,
+            stateStorageServiceUri,
+            authConfig);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private void setupClient() throws Exception {
+        if (appsClient == null) {
+            if (k8Uri == null) {
+                log.info("k8Uri is null thus going by defaults");
+                ApiClient cli;
+                if (submittingInsidePod) {
+                    log.info("Looks like we are inside a k8 pod ourselves. Initializing as cluster");
+                    cli = Config.fromCluster();
+                } else {
+                    log.info("Using default cluster since we are not running inside k8");
+                    cli = Config.defaultClient();
+                }
+                Configuration.setDefaultApiClient(cli);
+                appsClient = new AppsV1Api();
+                coreClient = new CoreV1Api();
+            } else {
+                log.info("Setting up k8Client using uri " + k8Uri);
+                final ApiClient apiClient = new ApiClient().setBasePath(k8Uri);
+                appsClient = new AppsV1Api(apiClient);
+                coreClient = new CoreV1Api(apiClient);
+            }
+        }
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 63f6f3f..2146376 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -23,22 +23,17 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.Empty;
-import com.google.protobuf.util.JsonFormat;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 
 import java.io.InputStream;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
@@ -46,8 +41,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
 /**
  * A function container implemented using java thread.
  */
@@ -66,6 +59,7 @@ class ProcessRuntime implements Runtime {
     private InstanceControlGrpc.InstanceControlFutureStub stub;
     private ScheduledExecutorService timer;
     private InstanceConfig instanceConfig;
+    private final Long expectedHealthCheckInterval;
 
     ProcessRuntime(InstanceConfig instanceConfig,
                    String instanceFile,
@@ -77,101 +71,10 @@ class ProcessRuntime implements Runtime {
                    Long expectedHealthCheckInterval) throws Exception {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
-        this.processArgs = composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
-                authConfig, expectedHealthCheckInterval);
-    }
-
-    private List<String> composeArgs(InstanceConfig instanceConfig,
-                                     String instanceFile,
-                                     String logDirectory,
-                                     String codeFile,
-                                     String pulsarServiceUrl,
-                                     String stateStorageServiceUrl,
-                                     AuthenticationConfig authConfig,
-                                     Long expectedHealthCheckInterval) throws Exception {
-        List<String> args = new LinkedList<>();
-        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
-            args.add("java");
-            args.add("-cp");
-            args.add(instanceFile);
-
-            // Keep the same env property pointing to the Java instance file so that it can be picked up
-            // by the child process and manually added to classpath
-            args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
-            args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml");
-            args.add("-Dpulsar.function.log.dir=" + String.format(
-                    "%s/%s",
-                    logDirectory,
-                    FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())));
-            args.add("-Dpulsar.function.log.file=" + String.format(
-                    "%s-%s",
-                    instanceConfig.getFunctionDetails().getName(),
-                    instanceConfig.getInstanceId()));
-            if (instanceConfig.getFunctionDetails().getResources() != null) {
-                Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
-                if (resources.getRam() != 0) {
-                    args.add("-Xmx" + String.valueOf(resources.getRam()));
-                }
-            }
-            args.add(JavaInstanceMain.class.getName());
-            args.add("--jar");
-            args.add(codeFile);
-        } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
-            args.add("python");
-            args.add(instanceFile);
-            args.add("--py");
-            args.add(codeFile);
-            args.add("--logging_directory");
-            args.add(logDirectory);
-            args.add("--logging_file");
-            args.add(instanceConfig.getFunctionDetails().getName());
-            // TODO:- Find a platform independent way of controlling memory for a python application
-        }
-        args.add("--instance_id");
-        args.add(instanceConfig.getInstanceName());
-        args.add("--function_id");
-        args.add(instanceConfig.getFunctionId());
-        args.add("--function_version");
-        args.add(instanceConfig.getFunctionVersion());
-        args.add("--function_details");
-        args.add(JsonFormat.printer().print(instanceConfig.getFunctionDetails()));
-
-        args.add("--pulsar_serviceurl");
-        args.add(pulsarServiceUrl);
-        if (authConfig != null) {
-            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
-                    && isNotBlank(authConfig.getClientAuthenticationParameters())) {
-                args.add("--client_auth_plugin");
-                args.add(authConfig.getClientAuthenticationPlugin());
-                args.add("--client_auth_params");
-                args.add(authConfig.getClientAuthenticationParameters());
-            }
-            args.add("--use_tls");
-            args.add(Boolean.toString(authConfig.isUseTls()));
-            args.add("--tls_allow_insecure");
-            args.add(Boolean.toString(authConfig.isTlsAllowInsecureConnection()));
-            args.add("--hostname_verification_enabled");
-            args.add(Boolean.toString(authConfig.isTlsHostnameVerificationEnable()));
-            if(isNotBlank(authConfig.getTlsTrustCertsFilePath())) {
-                args.add("--tls_trust_cert_path");
-                args.add(authConfig.getTlsTrustCertsFilePath());
-            }
-        }
-        args.add("--max_buffered_tuples");
-        args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
-
-        args.add("--port");
-        args.add(String.valueOf(instanceConfig.getPort()));
-
-        // state storage configs
-        if (null != stateStorageServiceUrl
-            && instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
-            args.add("--state_storage_serviceurl");
-            args.add(stateStorageServiceUrl);
-        }
-        args.add("--expected_healthcheck_interval");
-        args.add(String.valueOf(expectedHealthCheckInterval));
-        return args;
+        this.expectedHealthCheckInterval = expectedHealthCheckInterval;
+        this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
+                authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
+                "java_instance_log4j2.yml");
     }
 
     /**
@@ -201,7 +104,7 @@ class ProcessRuntime implements Runtime {
                                 instanceConfig.getInstanceId(), e);
                     }
                 }
-            }, 30000, 30000, TimeUnit.MILLISECONDS);
+            }, expectedHealthCheckInterval, expectedHealthCheckInterval, TimeUnit.SECONDS);
         }
     }
 
@@ -226,7 +129,7 @@ class ProcessRuntime implements Runtime {
     }
 
     @Override
-    public CompletableFuture<FunctionStatus> getFunctionStatus() {
+    public CompletableFuture<FunctionStatus> getFunctionStatus(int instanceId) {
         CompletableFuture<FunctionStatus> retval = new CompletableFuture<>();
         if (stub == null) {
             retval.completeExceptionally(new RuntimeException("Not alive"));
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 7b910bc..78b069c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -92,6 +92,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
 
     @Override
     public ProcessRuntime createContainer(InstanceConfig instanceConfig, String codeFile,
+                                          String originalcodeFileName,
                                           Long expectedHealthCheckInterval) throws Exception {
         String instanceFile;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index ea99290..ac1eced 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -28,17 +28,17 @@ import java.util.concurrent.CompletableFuture;
  */
 public interface Runtime {
 
-    void start();
+    void start() throws Exception;
 
     void join() throws Exception;
 
-    void stop();
+    void stop() throws Exception;
 
     boolean isAlive();
 
     Throwable getDeathException();
 
-    CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus();
+    CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(int instanceId);
 
     CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics();
     
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index ef2ea9c..fd8a7be 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -33,9 +33,11 @@ public interface RuntimeFactory extends AutoCloseable {
      * @return function container to start/stop instance
      */
     Runtime createContainer(
-            InstanceConfig instanceConfig, String codeFile,
+            InstanceConfig instanceConfig, String codeFile, String originalCodeFileName,
             Long expectedHealthCheckInterval) throws Exception;
 
+    default boolean externallyManaged() { return false; }
+
     @Override
     void close();
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index adb4578..d4d61e0 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -41,9 +41,11 @@ import static org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime
 @Slf4j
 public class RuntimeSpawner implements AutoCloseable {
 
+    @Getter
     private final InstanceConfig instanceConfig;
     private final RuntimeFactory runtimeFactory;
     private final String codeFile;
+    private final String originalCodeFileName;
 
     @Getter
     private Runtime runtime;
@@ -55,10 +57,12 @@ public class RuntimeSpawner implements AutoCloseable {
 
     public RuntimeSpawner(InstanceConfig instanceConfig,
                           String codeFile,
+                          String originalCodeFileName,
                           RuntimeFactory containerFactory, long instanceLivenessCheckFreqMs) {
         this.instanceConfig = instanceConfig;
         this.runtimeFactory = containerFactory;
         this.codeFile = codeFile;
+        this.originalCodeFileName = originalCodeFileName;
         this.numRestarts = 0;
         this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs;
     }
@@ -68,12 +72,12 @@ public class RuntimeSpawner implements AutoCloseable {
         log.info("{}/{}/{}-{} RuntimeSpawner starting function", details.getTenant(), details.getNamespace(),
                 details.getName(), this.instanceConfig.getInstanceId());
 
-        runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile,
+        runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile, originalCodeFileName,
                 instanceLivenessCheckFreqMs * 1000);
         runtime.start();
 
         // monitor function runtime to make sure it is running.  If not, restart the function runtime
-        if (instanceLivenessCheckFreqMs > 0) {
+        if (!runtimeFactory.externallyManaged() && instanceLivenessCheckFreqMs > 0) {
             processLivenessCheckTimer = new Timer();
             processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
                 @Override
@@ -82,9 +86,14 @@ public class RuntimeSpawner implements AutoCloseable {
                         log.error("{}/{}/{}-{} Function Container is dead with exception.. restarting", details.getTenant(),
                                 details.getNamespace(), details.getName(), runtime.getDeathException());
                         // Just for the sake of sanity, just destroy the runtime
-                        runtime.stop();
-                        runtimeDeathException = runtime.getDeathException();
-                        runtime.start();
+                        try {
+                            runtime.stop();
+                            runtimeDeathException = runtime.getDeathException();
+                            runtime.start();
+                        } catch (Exception e) {
+                            log.error("{}/{}/{}-{} Function Restart failed", details.getTenant(),
+                                    details.getNamespace(), details.getName(), e);
+                        }
                         numRestarts++;
                     }
                 }
@@ -98,10 +107,10 @@ public class RuntimeSpawner implements AutoCloseable {
         }
     }
 
-    public CompletableFuture<FunctionStatus> getFunctionStatus() {
-        return runtime.getFunctionStatus().thenApply(f -> {
-            FunctionStatus.Builder builder = FunctionStatus.newBuilder();
-            builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceName());
+    public CompletableFuture<FunctionStatus> getFunctionStatus(int instanceId) {
+        return runtime.getFunctionStatus(instanceId).thenApply(f -> {
+           FunctionStatus.Builder builder = FunctionStatus.newBuilder();
+           builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(String.valueOf(instanceId));
             if (!f.getRunning() && runtimeDeathException != null) {
                 builder.setFailureException(runtimeDeathException.getMessage());
             }
@@ -109,8 +118,8 @@ public class RuntimeSpawner implements AutoCloseable {
         });
     }
 
-    public CompletableFuture<String> getFunctionStatusAsJson() {
-        return this.getFunctionStatus().thenApply(msg -> {
+    public CompletableFuture<String> getFunctionStatusAsJson(int instanceId) {
+        return this.getFunctionStatus(instanceId).thenApply(msg -> {
             try {
                 return Utils.printJson(msg);
             } catch (IOException e) {
@@ -123,7 +132,11 @@ public class RuntimeSpawner implements AutoCloseable {
     @Override
     public void close() {
         if (null != runtime) {
-            runtime.stop();
+            try {
+                runtime.stop();
+            } catch (Exception e) {
+                // Ignore
+            }
             runtime = null;
         }
         if (processLivenessCheckTimer != null) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
new file mode 100644
index 0000000..fe2a88e
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.runtime;
+
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+
+import java.util.*;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+/**
+ * Util class for common runtime functionality
+ */
+@Slf4j
+class RuntimeUtils {
+
+    public static List<String> composeArgs(InstanceConfig instanceConfig,
+                                           String instanceFile,
+                                           String logDirectory,
+                                           String originalCodeFileName,
+                                           String pulsarServiceUrl,
+                                           String stateStorageServiceUrl,
+                                           AuthenticationConfig authConfig,
+                                           String shardId,
+                                           Integer grpcPort,
+                                           Long expectedHealthCheckInterval,
+                                           String javaLog4jFileName) throws Exception {
+        List<String> args = new LinkedList<>();
+        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
+            args.add("java");
+            args.add("-cp");
+            args.add(instanceFile);
+
+            // Keep the same env property pointing to the Java instance file so that it can be picked up
+            // by the child process and manually added to classpath
+            args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
+            args.add("-Dlog4j.configurationFile=" + javaLog4jFileName);
+            args.add("-Dpulsar.function.log.dir=" + String.format(
+                    "%s/%s",
+                    logDirectory,
+                    FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())));
+            args.add("-Dpulsar.function.log.file=" + String.format(
+                    "%s-%s",
+                    instanceConfig.getFunctionDetails().getName(),
+                    shardId));
+            if (instanceConfig.getFunctionDetails().getResources() != null) {
+                Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
+                if (resources.getRam() != 0) {
+                    args.add("-Xmx" + String.valueOf(resources.getRam()));
+                }
+            }
+            args.add(JavaInstanceMain.class.getName());
+            args.add("--jar");
+            args.add(originalCodeFileName);
+        } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
+            args.add("python");
+            args.add(instanceFile);
+            args.add("--py");
+            args.add(originalCodeFileName);
+            args.add("--logging_directory");
+            args.add(logDirectory);
+            args.add("--logging_file");
+            args.add(instanceConfig.getFunctionDetails().getName());
+            // TODO:- Find a platform independent way of controlling memory for a python application
+        }
+        args.add("--instance_id");
+        args.add(shardId);
+        args.add("--function_id");
+        args.add(instanceConfig.getFunctionId());
+        args.add("--function_version");
+        args.add(instanceConfig.getFunctionVersion());
+        args.add("--function_details");
+        args.add("'" + JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails()) + "'");
+
+        args.add("--pulsar_serviceurl");
+        args.add(pulsarServiceUrl);
+        if (authConfig != null) {
+            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+                    && isNotBlank(authConfig.getClientAuthenticationParameters())) {
+                args.add("--client_auth_plugin");
+                args.add(authConfig.getClientAuthenticationPlugin());
+                args.add("--client_auth_params");
+                args.add(authConfig.getClientAuthenticationParameters());
+            }
+            args.add("--use_tls");
+            args.add(Boolean.toString(authConfig.isUseTls()));
+            args.add("--tls_allow_insecure");
+            args.add(Boolean.toString(authConfig.isTlsAllowInsecureConnection()));
+            args.add("--hostname_verification_enabled");
+            args.add(Boolean.toString(authConfig.isTlsHostnameVerificationEnable()));
+            if (isNotBlank(authConfig.getTlsTrustCertsFilePath())) {
+                args.add("--tls_trust_cert_path");
+                args.add(authConfig.getTlsTrustCertsFilePath());
+            }
+        }
+        args.add("--max_buffered_tuples");
+        args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
+
+        args.add("--port");
+        args.add(String.valueOf(grpcPort));
+
+        // state storage configs
+        if (null != stateStorageServiceUrl
+                && instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
+            args.add("--state_storage_serviceurl");
+            args.add(stateStorageServiceUrl);
+        }
+        args.add("--expected_healthcheck_interval");
+        args.add(String.valueOf(expectedHealthCheckInterval));
+        return args;
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 6e8a393..05cb87f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -99,7 +99,7 @@ class ThreadRuntime implements Runtime {
     }
 
     @Override
-    public CompletableFuture<FunctionStatus> getFunctionStatus() {
+    public CompletableFuture<FunctionStatus> getFunctionStatus(int instanceId) {
         CompletableFuture<FunctionStatus> statsFuture = new CompletableFuture<>();
         if (!isAlive()) {
             FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index 84d21af..dfbbb64 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -52,7 +52,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     }
 
     @VisibleForTesting
-    ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl) {
+    public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl) {
         this.fnCache = new FunctionCacheManagerImpl();
         this.threadGroup = new ThreadGroup(threadGroupName);
         this.pulsarClient = pulsarClient;
@@ -82,6 +82,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     
     @Override
     public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile,
+                                         String originalCodeFileName,
                                          Long expectedHealthCheckInterval) {
         return new ThreadRuntime(
             instanceConfig,
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
similarity index 74%
copy from pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
copy to pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 2345783..274d973 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -19,15 +19,7 @@
 
 package org.apache.pulsar.functions.runtime;
 
-import static org.testng.Assert.assertEquals;
-
-import com.google.gson.Gson;
 import com.google.protobuf.util.JsonFormat;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
@@ -36,14 +28,20 @@ import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
 /**
  * Unit test of {@link ThreadRuntime}.
  */
-public class ProcessRuntimeTest {
+public class KubernetesRuntimeTest {
 
-    private static final String TEST_TENANT = "test-function-tenant";
-    private static final String TEST_NAMESPACE = "test-function-namespace";
-    private static final String TEST_NAME = "test-function-container";
+    private static final String TEST_TENANT = "tenant";
+    private static final String TEST_NAMESPACE = "namespace";
+    private static final String TEST_NAME = "container";
     private static final Map<String, String> topicsToSerDeClassName = new HashMap<>();
     private static final Map<String, ConsumerSpec> topicsToSchema = new HashMap<>();
     static {
@@ -52,23 +50,25 @@ public class ProcessRuntimeTest {
                 ConsumerSpec.newBuilder().setSerdeClassName("").setIsRegexPattern(false).build());
     }
 
-    private final ProcessRuntimeFactory factory;
+    private final KubernetesRuntimeFactory factory;
     private final String userJarFile;
     private final String javaInstanceJarFile;
     private final String pythonInstanceFile;
     private final String pulsarServiceUrl;
+    private final String pulsarAdminUrl;
     private final String stateStorageServiceUrl;
     private final String logDirectory;
 
-    public ProcessRuntimeTest() {
+    public KubernetesRuntimeTest() throws Exception {
         this.userJarFile = "/Users/user/UserJar.jar";
-        this.javaInstanceJarFile = "/Users/user/JavaInstance.jar";
-        this.pythonInstanceFile = "/Users/user/PythonInstance.py";
+        this.javaInstanceJarFile = "/pulsar/instances/java-instance.jar";
+        this.pythonInstanceFile = "/pulsar/instances/python-instance/python_instance_main.py";
         this.pulsarServiceUrl = "pulsar://localhost:6670";
+        this.pulsarAdminUrl = "http://localhost:8080";
         this.stateStorageServiceUrl = "bk://localhost:4181";
-        this.logDirectory = "Users/user/logs";
-        this.factory = new ProcessRuntimeFactory(
-            pulsarServiceUrl, stateStorageServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, logDirectory);
+        this.logDirectory = "logs/functions";
+        this.factory = new KubernetesRuntimeFactory(null, null, null, null,
+            false, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null);
     }
 
     @AfterMethod
@@ -114,23 +114,23 @@ public class ProcessRuntimeTest {
     public void testJavaConstructor() throws Exception {
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA);
 
-        ProcessRuntime container = factory.createContainer(config, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
         assertEquals(args.size(), 28);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
-                + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
-                + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionDetailsUtils.getFullyQualifiedName(config.getFunctionDetails())
-                + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
+                + " -Dlog4j.configurationFile=conf/log4j2.yaml "
+                + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionDetailsUtils.getFullyQualifiedName(config.getFunctionDetails())
+                + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
                 + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
                 + " --jar " + userJarFile + " --instance_id "
-                + config.getInstanceId() + " --function_id " + config.getFunctionId()
+                + "$SHARD_ID" + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
-                + " --function_details " + JsonFormat.printer().print(config.getFunctionDetails())
-                + " --pulsar_serviceurl " + pulsarServiceUrl
+                + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+                + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
-                + " --expected_healthcheck_interval 30";
+                + " --expected_healthcheck_interval -1";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -138,18 +138,18 @@ public class ProcessRuntimeTest {
     public void testPythonConstructor() throws Exception {
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON);
 
-        ProcessRuntime container = factory.createContainer(config, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
         assertEquals(args.size(), 24);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
-                + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
-                + config.getInstanceId() + " --function_id " + config.getFunctionId()
+                + logDirectory + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
+                + "$SHARD_ID" + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
-                + " --function_details " + JsonFormat.printer().print(config.getFunctionDetails())
-                + " --pulsar_serviceurl " + pulsarServiceUrl
+                + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+                + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(21)
-                + " --expected_healthcheck_interval 30";
+                + " --expected_healthcheck_interval -1";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 2345783..b09b9af 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -114,7 +114,7 @@ public class ProcessRuntimeTest {
     public void testJavaConstructor() throws Exception {
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA);
 
-        ProcessRuntime container = factory.createContainer(config, userJarFile, 30l);
+        ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
         assertEquals(args.size(), 28);
         String expectedArgs = "java -cp " + javaInstanceJarFile
@@ -126,8 +126,8 @@ public class ProcessRuntimeTest {
                 + " --jar " + userJarFile + " --instance_id "
                 + config.getInstanceId() + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
-                + " --function_details " + JsonFormat.printer().print(config.getFunctionDetails())
-                + " --pulsar_serviceurl " + pulsarServiceUrl
+                + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+                + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
                 + " --expected_healthcheck_interval 30";
@@ -138,7 +138,7 @@ public class ProcessRuntimeTest {
     public void testPythonConstructor() throws Exception {
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON);
 
-        ProcessRuntime container = factory.createContainer(config, userJarFile, 30l);
+        ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
         assertEquals(args.size(), 24);
         String expectedArgs = "python " + pythonInstanceFile
@@ -146,8 +146,8 @@ public class ProcessRuntimeTest {
                 + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
                 + config.getInstanceId() + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
-                + " --function_details " + JsonFormat.printer().print(config.getFunctionDetails())
-                + " --pulsar_serviceurl " + pulsarServiceUrl
+                + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+                + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(21)
                 + " --expected_healthcheck_interval 30";
         assertEquals(String.join(" ", args), expectedArgs);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index a3355b0..52af689 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -138,26 +138,31 @@ public class FunctionActioner implements AutoCloseable {
         FunctionDetails.Builder functionDetails = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
         log.info("{}/{}/{}-{} Starting function ...", functionDetails.getTenant(), functionDetails.getNamespace(),
                 functionDetails.getName(), instanceId);
-        File pkgFile = null;
+        String packageFile;
 
         String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
         boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
 
         if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
             URL url = new URL(pkgLocation);
-            pkgFile = new File(url.toURI());
+            File pkgFile = new File(url.toURI());
+            packageFile = pkgFile.getAbsolutePath();
         } else if (isFunctionCodeBuiltin(functionDetails)) {
-            pkgFile = getBuiltinArchive(functionDetails);
+            File pkgFile = getBuiltinArchive(functionDetails);
+            packageFile = pkgFile.getAbsolutePath();
+        } else if (runtimeFactory.externallyManaged()) {
+            packageFile = pkgLocation;
         } else {
             File pkgDir = new File(
                     workerConfig.getDownloadDirectory(),
                     getDownloadPackagePath(functionMetaData, instanceId));
             pkgDir.mkdirs();
 
-            pkgFile = new File(
+            File pkgFile = new File(
                     pkgDir,
                     new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName());
             downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
+            packageFile = pkgFile.getAbsolutePath();
         }
 
         InstanceConfig instanceConfig = new InstanceConfig();
@@ -172,7 +177,8 @@ public class FunctionActioner implements AutoCloseable {
         log.info("{}/{}/{}-{} start process with instance config {}", functionDetails.getTenant(), functionDetails.getNamespace(),
                 functionDetails.getName(), instanceId, instanceConfig);
 
-        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
+        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile,
+                functionMetaData.getPackageLocation().getOriginalFileName(),
                 runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
 
         functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index e634130..992e5db 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashMap;
@@ -32,31 +31,26 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
 import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.RuntimeFactory;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.*;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.runtime.Runtime;
 
 /**
  * This class managers all aspects of functions assignments and running of function assignments for this worker
@@ -85,6 +79,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     private FunctionActioner functionActioner;
 
+    @Getter
     private RuntimeFactory runtimeFactory;
 
     private MembershipManager membershipManager;
@@ -123,8 +118,19 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(),
                     workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),
                     workerConfig.getProcessContainerFactory().getLogDirectory());
+        } else if (workerConfig.getKubernetesContainerFactory() != null){
+            this.runtimeFactory = new KubernetesRuntimeFactory(
+                    workerConfig.getKubernetesContainerFactory().getK8Uri(),
+                    workerConfig.getKubernetesContainerFactory().getJobNamespace(),
+                    workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(),
+                    workerConfig.getKubernetesContainerFactory().getPulsarRootDir(),
+                    workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(),
+                    StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
+                    StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
+                    workerConfig.getStateStorageServiceUrl(),
+                    authConfig);
         } else {
-            throw new RuntimeException("Either Thread or Process Container Factory need to be set");
+            throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
         }
 
         this.actionQueue = new LinkedBlockingQueue<>();
@@ -236,7 +242,12 @@ public class FunctionRuntimeManager implements AutoCloseable{
      */
     public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace,
             String functionName, int instanceId, URI uri) {
-        Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
+        Assignment assignment;
+        if (runtimeFactory.externallyManaged()) {
+            assignment = this.findAssignment(tenant, namespace, functionName, -1);
+        } else {
+            assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
+        }
         final String assignedWorkerId = assignment.getWorkerId();
         final String workerId = this.workerConfig.getWorkerId();
         
@@ -257,7 +268,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
             if (runtimeSpawner != null) {
                 try {
                     InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus
-                            .newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get());
+                            .newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get());
                     functionStatusBuilder.setWorkerId(assignedWorkerId);
                     functionStatus = functionStatusBuilder.build();
                 } catch (InterruptedException | ExecutionException e) {
@@ -302,6 +313,10 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     public Response stopFunctionInstance(String tenant, String namespace, String functionName, int instanceId,
             boolean restart, URI uri) throws Exception {
+        if (runtimeFactory.externallyManaged()) {
+            return Response.status(Status.NOT_IMPLEMENTED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Externally managed schedulers can't do per instance stop")).build();
+        }
         Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
         final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId);
         if (assignment == null) {
@@ -343,7 +358,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build();
         }
-        for (Assignment assignment : assignments) {
+        if (runtimeFactory.externallyManaged()) {
+            Assignment assignment = assignments.iterator().next();
             final String assignedWorkerId = assignment.getWorkerId();
             final String workerId = this.workerConfig.getWorkerId();
             String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
@@ -361,14 +377,43 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] has not been assigned yet", fullyQualifiedInstanceId);
                     }
-                    continue;
+                    return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                            .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build();
                 }
                 if (restart) {
-                    this.functionAdmin.functions().restartFunction(tenant, namespace, functionName,
-                            assignment.getInstance().getInstanceId());
+                    this.functionAdmin.functions().restartFunction(tenant, namespace, functionName);
                 } else {
-                    this.functionAdmin.functions().stopFunction(tenant, namespace, functionName,
-                            assignment.getInstance().getInstanceId());
+                    this.functionAdmin.functions().stopFunction(tenant, namespace, functionName);
+                }
+            }
+        } else {
+            for (Assignment assignment : assignments) {
+                final String assignedWorkerId = assignment.getWorkerId();
+                final String workerId = this.workerConfig.getWorkerId();
+                String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+                if (assignedWorkerId.equals(workerId)) {
+                    stopFunction(fullyQualifiedInstanceId, restart);
+                } else {
+                    List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
+                    WorkerInfo workerInfo = null;
+                    for (WorkerInfo entry : workerInfoList) {
+                        if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                            workerInfo = entry;
+                        }
+                    }
+                    if (workerInfo == null) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] has not been assigned yet", fullyQualifiedInstanceId);
+                        }
+                        continue;
+                    }
+                    if (restart) {
+                        this.functionAdmin.functions().restartFunction(tenant, namespace, functionName,
+                                assignment.getInstance().getInstanceId());
+                    } else {
+                        this.functionAdmin.functions().stopFunction(tenant, namespace, functionName,
+                                assignment.getInstance().getInstanceId());
+                    }
                 }
             }
         }
@@ -379,7 +424,11 @@ public class FunctionRuntimeManager implements AutoCloseable{
      * It stops all functions instances owned by current worker
      * @throws Exception
      */
-    public void stopAllOwnedFunctions() throws Exception {
+    public void stopAllOwnedFunctions() {
+        if (runtimeFactory.externallyManaged()) {
+            log.warn("Will not stop any functions since they are externally managed");
+            return;
+        }
         final String workerId = this.workerConfig.getWorkerId();
         Map<String, Assignment> assignments = workerIdToAssignments.get(workerId);
         if (assignments != null) {
@@ -401,7 +450,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
             this.functionActioner.stopFunction(functionRuntimeInfo);
             try {
                 if(restart) {
-                    this.functionActioner.startFunction(functionRuntimeInfo);    
+                    this.functionActioner.startFunction(functionRuntimeInfo);
                 }
             } catch (Exception ex) {
                 log.info("{} Error re-starting function", fullyQualifiedInstanceId, ex);
@@ -429,24 +478,38 @@ public class FunctionRuntimeManager implements AutoCloseable{
             return functionStatusListBuilder.build();
         }
 
-        for (Assignment assignment : assignments) {
+        if (runtimeFactory.externallyManaged()) {
+            Assignment assignment = assignments.iterator().next();
             boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
-            InstanceCommunication.FunctionStatus functionStatus = isOwner
-                    ? (getFunctionInstanceStatus(tenant, namespace, functionName,
-                            assignment.getInstance().getInstanceId(), null))
-                    : this.functionAdmin.functions().getFunctionStatus(
-                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
-                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
-                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
-                            assignment.getInstance().getInstanceId());
-            functionStatusListBuilder.addFunctionStatusList(functionStatus);
+            if (isOwner) {
+                int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
+                for (int i = 0; i < parallelism; ++i) {
+                    InstanceCommunication.FunctionStatus functionStatus = getFunctionInstanceStatus(tenant, namespace,
+                            functionName, i, null);
+                    functionStatusListBuilder.addFunctionStatusList(functionStatus);
+                }
+            } else {
+                return this.functionAdmin.functions().getFunctionStatus(tenant, namespace, functionName);
+            }
+        } else {
+            for (Assignment assignment : assignments) {
+                boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
+                InstanceCommunication.FunctionStatus functionStatus = isOwner
+                        ? (getFunctionInstanceStatus(tenant, namespace, functionName,
+                        assignment.getInstance().getInstanceId(), null))
+                        : this.functionAdmin.functions().getFunctionStatus(
+                        assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                        assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                        assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                        assignment.getInstance().getInstanceId());
+                functionStatusListBuilder.addFunctionStatusList(functionStatus);
+            }
         }
         return functionStatusListBuilder.build();
     }
 
     /**
      * Process an assignment update from the assignment topic
-     * @param messageId the message id of the update assignment
      * @param newAssignment the assignment
      */
     public synchronized void processAssignment(Assignment newAssignment) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index b18fd12..03b9aea 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -221,7 +221,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
                     .map(assignment -> assignment.getInstance())
                     .collect(Collectors.toSet());
 
-            Set<Function.Instance> instances = new HashSet<>(SchedulerManager.computeInstances(functionMetaData));
+            Set<Function.Instance> instances = new HashSet<>(SchedulerManager.computeInstances(functionMetaData, functionRuntimeManager.getRuntimeFactory().externallyManaged()));
 
             for (Function.Instance instance : instances) {
                 if (!assignedInstances.contains(instance)) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index db7785a..608b2fc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -126,7 +126,7 @@ public class SchedulerManager implements AutoCloseable {
                 .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList());
 
         List<FunctionMetaData> allFunctions = this.functionMetaDataManager.getAllFunctionMetaData();
-        Map<String, Function.Instance> allInstances = computeAllInstances(allFunctions);
+        Map<String, Function.Instance> allInstances = computeAllInstances(allFunctions, functionRuntimeManager.getRuntimeFactory().externallyManaged());
         Map<String, Map<String, Assignment>> workerIdToAssignments = this.functionRuntimeManager
                 .getCurrentAssignments();
         //delete assignments of functions and instances that don't exist anymore
@@ -207,23 +207,32 @@ public class SchedulerManager implements AutoCloseable {
         }
     }
 
-    public static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions) {
+    public static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions,
+                                                                     boolean externallyManagedRuntime) {
         Map<String, Function.Instance> functionInstances = new HashMap<>();
         for (FunctionMetaData functionMetaData : allFunctions) {
-            for (Function.Instance instance : computeInstances(functionMetaData)) {
+            for (Function.Instance instance : computeInstances(functionMetaData, externallyManagedRuntime)) {
                 functionInstances.put(Utils.getFullyQualifiedInstanceId(instance), instance);
             }
         }
         return functionInstances;
     }
 
-    public static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData) {
+    public static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData,
+                                                           boolean externallyManagedRuntime) {
         List<Function.Instance> functionInstances = new LinkedList<>();
-        int instances = functionMetaData.getFunctionDetails().getParallelism();
-        for (int i = 0; i < instances; i++) {
+        if (!externallyManagedRuntime) {
+            int instances = functionMetaData.getFunctionDetails().getParallelism();
+            for (int i = 0; i < instances; i++) {
+                functionInstances.add(Function.Instance.newBuilder()
+                        .setFunctionMetaData(functionMetaData)
+                        .setInstanceId(i)
+                        .build());
+            }
+        } else {
             functionInstances.add(Function.Instance.newBuilder()
                     .setFunctionMetaData(functionMetaData)
-                    .setInstanceId(i)
+                    .setInstanceId(-1)
                     .build());
         }
         return functionInstances;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index a2524c6..81e9fd2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -125,6 +125,22 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     }
     private ProcessContainerFactory processContainerFactory;
 
+    @Data
+    @Setter
+    @Getter
+    @EqualsAndHashCode
+    @ToString
+    public static class KubernetesContainerFactory {
+        private String k8Uri;
+        private String jobNamespace;
+        private String pulsarDockerImageName;
+        private String pulsarRootDir;
+        private Boolean submittingInsidePod;
+        private String pulsarServiceUrl;
+        private String pulsarAdminUrl;
+    }
+    private KubernetesContainerFactory kubernetesContainerFactory;
+
     public String getFunctionMetadataTopic() {
         return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index aca3b4a..b92ae27 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -369,6 +369,13 @@ public class FunctionsImpl {
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
         }
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
+        int instanceIdInt = Integer.parseInt(instanceId);
+        if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
+            log.error("instanceId in getFunctionStatus out of bounds @ /{}/{}/{}", tenant, namespace, functionName);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Invalid InstanceId"))).build();
+        }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         FunctionStatus functionStatus = null;
@@ -751,7 +758,7 @@ public class FunctionsImpl {
         try {
             log.info("Uploading function package to {}", path);
 
-            Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, Codec.encode(path));
+            Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, path);
         } catch (IOException e) {
             log.error("Error uploading file {}", path, e);
             return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage()))
@@ -778,7 +785,7 @@ public class FunctionsImpl {
                         throw new IllegalArgumentException("invalid file url path: " + path);
                     }
                 } else {
-                    Utils.downloadFromBookkeeper(worker().getDlogNamespace(), output, Codec.encode(path));
+                    Utils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
                 }
             }
         }).build();
@@ -799,7 +806,6 @@ public class FunctionsImpl {
         validateGetFunctionRequestParams(tenant, namespace, functionName);
         if (instanceId == null) {
             throw new IllegalArgumentException("Function Instance Id is not provided");
-
         }
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 05e3422..bf4b4aa 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -103,7 +103,7 @@ public class FunctionActionerTest {
 
         RuntimeFactory factory = mock(RuntimeFactory.class);
         Runtime runtime = mock(Runtime.class);
-        doReturn(runtime).when(factory).createContainer(any(), any(), any());
+        doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
         doNothing().when(runtime).start();
         Namespace dlogNamespace = mock(Namespace.class);
         final String exceptionMsg = "dl namespace not-found";
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 00e4b6e..649cd3e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
@@ -153,6 +154,9 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -240,6 +244,9 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -297,6 +304,9 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -359,6 +369,9 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -464,6 +477,9 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -586,6 +602,9 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
 
@@ -637,6 +656,9 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")


Mime
View raw message