pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2748: Fix getstatus logic in kubernetes runtime
Date Tue, 09 Oct 2018 05:06:03 GMT
sijie closed pull request #2748: Fix getstatus logic in kubernetes runtime
URL: https://github.com/apache/pulsar/pull/2748
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index cff3c34651..132eaf3cdf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -179,7 +179,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName)
throws IOException {
         return functions.getFunctionStatus(
-            tenant, namespace, functionName);
+            tenant, namespace, functionName, uri.getRequestUri());
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 907cf86c9b..36db99fd3d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -366,7 +366,7 @@ public void testPulsarSinkStats() throws Exception {
         FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
         functionRuntimeManager.updateRates();
         FunctionStatusList functionStats = functionRuntimeManager.getAllFunctionStatus(tenant,
namespacePortion,
-                functionName);
+                functionName, null);
 
         int numInstances = functionStats.getFunctionStatusListCount();
         assertEquals(numInstances, 1);
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 65d1b2f966..d56a41deef 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -55,6 +55,7 @@ message FunctionStatus {
 }
 
 message FunctionStatusList {
+    string error = 2;
     repeated FunctionStatus functionStatusList = 1;
 }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 1b8a590baf..47d317fc94 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -306,8 +306,12 @@ public synchronized void removeAssignments(Collection<Assignment>
assignments) {
                 return functionStatusBuilder.build();
             }
 
-            URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
-            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            if (uri == null) {
+                throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+            } else {
+                URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            }
         }
 
         return functionStatus;
@@ -346,8 +350,12 @@ public Response stopFunctionInstance(String tenant, String namespace,
String fun
                         .entity(new ErrorData(fullFunctionName + " has not been assigned
yet")).build();
             }
 
-            URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
-            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            if (uri == null) {
+                throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+            } else {
+                URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            }
         }
     }
 
@@ -471,7 +479,7 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart)
thro
      * @throws PulsarAdminException 
      */
     public InstanceCommunication.FunctionStatusList getAllFunctionStatus(
-            String tenant, String namespace, String functionName) throws PulsarAdminException
{
+            String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException
{
 
         Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace,
functionName);
 
@@ -491,7 +499,28 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart)
thro
                     functionStatusListBuilder.addFunctionStatusList(functionStatus);
                 }
             } else {
-                return this.functionAdmin.functions().getFunctionStatus(tenant, namespace,
functionName);
+                // find the hostname/port of the worker who is the owner
+
+                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
+                WorkerInfo workerInfo = null;
+                for (WorkerInfo entry: workerInfoList) {
+                    if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                        workerInfo = entry;
+                    }
+                }
+                if (workerInfo == null) {
+                    InstanceCommunication.FunctionStatusList.Builder functionStatusBuilder
+                            = InstanceCommunication.FunctionStatusList.newBuilder();
+                    functionStatusBuilder.setError("Function not yet scheduled");
+                    return functionStatusBuilder.build();
+                }
+
+                if (uri == null) {
+                    throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
+                } else {
+                    URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+                    throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+                }
             }
         } else {
             for (Assignment assignment : assignments) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 3124731874..6ab5f0d8ad 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -508,13 +508,15 @@ public Response stopFunctionInstances(final String tenant, final String
namespac
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         try {
             return functionRuntimeManager.stopFunctionInstances(tenant, namespace, functionName,
restart);
+        } catch (WebApplicationException we) {
+            throw we;
         } catch (Exception e) {
             log.error("Failed to restart function: {}/{}/{}", tenant, namespace, functionName,
e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
     }
 
-    public Response getFunctionStatus(final String tenant, final String namespace, final
String functionName)
+    public Response getFunctionStatus(final String tenant, final String namespace, final
String functionName, URI uri)
             throws IOException {
 
         if (!isWorkerServiceAvailable()) {
@@ -540,7 +542,9 @@ public Response getFunctionStatus(final String tenant, final String namespace,
f
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         InstanceCommunication.FunctionStatusList functionStatusList = null;
         try {
-            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace,
functionName);
+            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace,
functionName, uri);
+        } catch (WebApplicationException we) {
+            throw we;
         } catch (Exception e) {
             log.error("Got Exception Getting Status", e);
             FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 1e44a6072b..46c7974b82 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -115,7 +115,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName)
throws IOException {
         return functions.getFunctionStatus(
-            tenant, namespace, functionName);
+            tenant, namespace, functionName, uri.getRequestUri());
     }
 
     @GET


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message