pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sanjee...@apache.org
Subject [pulsar] branch master updated: Enable stats to be recovered by Kubernetes runtime (#3363)
Date Mon, 14 Jan 2019 17:55:07 GMT
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 63a0491  Enable stats to be recovered by Kubernetes runtime (#3363)
63a0491 is described below

commit 63a0491ec86a4123062eb2835d6f385bf79e2552
Author: Sanjeev Kulkarni <sanjeevrk@gmail.com>
AuthorDate: Mon Jan 14 09:55:02 2019 -0800

    Enable stats to be recovered by Kubernetes runtime (#3363)
---
 .../pulsar/functions/runtime/JavaInstanceMain.java |  2 +-
 .../functions/runtime/KubernetesRuntime.java       | 26 +++++++++++++++++--
 .../pulsar/functions/runtime/ProcessRuntime.java   |  2 +-
 .../apache/pulsar/functions/runtime/Runtime.java   |  2 +-
 .../pulsar/functions/runtime/ThreadRuntime.java    |  2 +-
 .../functions/worker/FunctionRuntimeManager.java   |  2 +-
 .../org/apache/pulsar/functions/worker/Utils.java  | 17 +++---------
 .../functions/worker/rest/api/WorkerImpl.java      | 30 ++++++++++++++++------
 8 files changed, 55 insertions(+), 28 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 3a0a404..43467ae 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -317,7 +317,7 @@ public class JavaInstanceMain implements AutoCloseable {
             Runtime runtime = runtimeSpawner.getRuntime();
             if (runtime != null) {
                 try {
-                    InstanceCommunication.MetricsData metrics = runtime.getMetrics().get();
+                    InstanceCommunication.MetricsData metrics = runtime.getMetrics(instanceId).get();
                     responseObserver.onNext(metrics);
                     responseObserver.onCompleted();
                 } catch (InterruptedException | ExecutionException e) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index a3a006a..aba473b 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -295,9 +295,31 @@ class KubernetesRuntime implements Runtime {
     }
 
     @Override
-    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId)
{
         CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
-        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support
getting metrics via rest"));
+        if (instanceId < 0 || instanceId >= stub.length) {
+            if (stub == null) {
+                retval.completeExceptionally(new RuntimeException("Invalid InstanceId"));
+                return retval;
+            }
+        }
+        if (stub == null) {
+            retval.completeExceptionally(new RuntimeException("Not alive"));
+            return retval;
+        }
+        ListenableFuture<InstanceCommunication.MetricsData> response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS,
TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build());
+        Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>()
{
+            @Override
+            public void onFailure(Throwable throwable) {
+                InstanceCommunication.MetricsData.Builder builder = InstanceCommunication.MetricsData.newBuilder();
+                retval.complete(builder.build());
+            }
+
+            @Override
+            public void onSuccess(InstanceCommunication.MetricsData t) {
+                retval.complete(t);
+            }
+        });
         return retval;
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 14e68cc..87017a6 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -288,7 +288,7 @@ class ProcessRuntime implements Runtime {
     }
 
     @Override
-    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId)
{
         CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
         if (stub == null) {
             retval.completeExceptionally(new RuntimeException("Not alive"));
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index fafdca7..19a5fc9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -45,7 +45,7 @@ public interface Runtime {
     
     CompletableFuture<Void> resetMetrics();
     
-    CompletableFuture<InstanceCommunication.MetricsData> getMetrics();
+    CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId);
 
     String getPrometheusMetrics() throws IOException;
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 2bd4644..ad1002c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -155,7 +155,7 @@ class ThreadRuntime implements Runtime {
     
     
     @Override
-    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId)
{
         return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index fdd62d7..a5a6aa2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -487,7 +487,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
             RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
             if (runtimeSpawner != null) {
-                return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
functionRuntimeInfo).getMetrics();
+                return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
functionRuntimeInfo, instanceId).getMetrics();
             }
             return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
         } else {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 4240506..3c1fa4c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -185,7 +185,9 @@ public final class Utils {
         }
     }
 
-    public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName,
FunctionRuntimeInfo functionRuntimeInfo) {
+    public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName,
+                                                                               FunctionRuntimeInfo
functionRuntimeInfo,
+                                                                               int instanceId)
{
         RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
 
         FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
@@ -194,8 +196,7 @@ public final class Utils {
             if (functionRuntime != null) {
                 try {
 
-                    InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get();
-                    int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                    InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics(instanceId).get();
                     functionInstanceStats.setInstanceId(instanceId);
 
                     FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData
@@ -229,14 +230,4 @@ public final class Utils {
         }
         return functionInstanceStats;
     }
-
-    public static FunctionStats getFunctionStats(Map<String, FunctionRuntimeInfo> functionRuntimes)
{
-        FunctionStats functionStats = new FunctionStats();
-        for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet())
{
-            String fullyQualifiedInstanceName = entry.getKey();
-            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
-            functionStats.addInstance(Utils.getFunctionInstanceStats(fullyQualifiedInstanceName,
functionRuntimeInfo));
-        }
-        return functionStats;
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 6351272..79f4df2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -158,14 +158,28 @@ public class WorkerImpl {
             String fullyQualifiedInstanceName = entry.getKey();
             FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
 
-            FunctionStats.FunctionInstanceStats functionInstanceStats =
-                    Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
-
-            WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
-            workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
-            workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
-
-            metricsList.add(workerFunctionInstanceStats);
+            if (workerService.getFunctionRuntimeManager().getRuntimeFactory().externallyManaged())
{
+                Function.FunctionDetails functionDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
+                int parallelism = functionDetails.getParallelism();
+                for (int i = 0; i < parallelism; ++i) {
+                    FunctionStats.FunctionInstanceStats functionInstanceStats =
+                            Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo,
i);
+                    WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
+                    workerFunctionInstanceStats.setName(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(
+                            functionDetails.getTenant(), functionDetails.getNamespace(),
functionDetails.getName(), i
+                    ));
+                    workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
+                    metricsList.add(workerFunctionInstanceStats);
+                }
+            } else {
+                FunctionStats.FunctionInstanceStats functionInstanceStats =
+                        Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo,
+                                functionRuntimeInfo.getFunctionInstance().getInstanceId());
+                WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
+                workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
+                workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
+                metricsList.add(workerFunctionInstanceStats);
+            }
         }
         return metricsList;
     }


Mime
View raw message