pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2516: Fix: get function status with auth enable
Date Thu, 13 Sep 2018 18:29:39 GMT
sijie closed pull request #2516: Fix: get function status with auth enable
URL: https://github.com/apache/incubator-pulsar/pull/2516
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index def5452dbb..62d12ec9f0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -160,7 +160,7 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String
tena
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId)
throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, functionName, instanceId);
+            tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @GET
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 7dc7050ac5..1d315d78f0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -110,9 +110,8 @@ public FunctionStatusList getFunctionStatus(
         }
     }
 
-    @Override
-    public FunctionStatus getFunctionStatus(String tenant, String namespace, String function,
int id)
-            throws PulsarAdminException {
+    public FunctionStatus getFunctionStatus(
+            String tenant, String namespace, String function, int id) throws PulsarAdminException
{
         try {
             Response response = request(
                     functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status"))
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 93828de40d..ee6eeecd50 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -26,6 +26,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
@@ -248,7 +249,7 @@ public synchronized void removeAssignments(Collection<Assignment>
assignments) {
      * @return the function status
      */
     public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant,
String namespace,
-                                                                          String functionName,
int instanceId) {
+            String functionName, int instanceId, URI uri) {
         Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
         final String assignedWorkerId = assignment.getWorkerId();
         final String workerId = this.workerConfig.getWorkerId();
@@ -306,23 +307,8 @@ public synchronized void removeAssignments(Collection<Assignment>
assignments) {
                 return functionStatusBuilder.build();
             }
 
-            Client client = ClientBuilder.newClient();
-
-            // TODO: implement authentication/authorization
-            String jsonResponse = client.target(String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/status",
-                    workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace,
functionName, instanceId))
-                    .request(MediaType.TEXT_PLAIN)
-                    .get(String.class);
-
-            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
-            try {
-                org.apache.pulsar.functions.utils.Utils.mergeJson(jsonResponse, functionStatusBuilder);
-            } catch (IOException e) {
-                log.warn("Got invalid function status response from {}", workerInfo, e);
-                throw new RuntimeException(e);
-            }
-            functionStatusBuilder.setWorkerId(assignedWorkerId);
-            functionStatus = functionStatusBuilder.build();
+            URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
+            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
         }
 
         return functionStatus;
@@ -426,9 +412,10 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart)
thro
      * @param namespace the namespace the function belongs to
      * @param functionName the function name
      * @return a list of function statuses
+     * @throws PulsarAdminException 
      */
     public InstanceCommunication.FunctionStatusList getAllFunctionStatus(
-            String tenant, String namespace, String functionName) {
+            String tenant, String namespace, String functionName) throws PulsarAdminException
{
 
         Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace,
functionName);
 
@@ -438,13 +425,15 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart)
thro
         }
 
         for (Assignment assignment : assignments) {
-
-            InstanceCommunication.FunctionStatus functionStatus = this.getFunctionInstanceStatus(
-                    assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
-                    assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
-                    assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
-                    assignment.getInstance().getInstanceId());
-
+            boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
+            InstanceCommunication.FunctionStatus functionStatus = isOwner
+                    ? (getFunctionInstanceStatus(tenant, namespace, functionName,
+                            assignment.getInstance().getInstanceId(), null))
+                    : this.functionAdmin.functions().getFunctionStatus(
+                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                            assignment.getInstance().getInstanceId());
             functionStatusListBuilder.addFunctionStatusList(functionStatus);
         }
         return functionStatusListBuilder.build();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 136bab0af7..df82c0d25c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -332,7 +332,7 @@ public Response getFunctionInfo(final String tenant, final String namespace,
fin
     }
 
     public Response getFunctionInstanceStatus(final String tenant, final String namespace,
final String functionName,
-            final String instanceId) throws IOException {
+            final String instanceId, URI uri) throws IOException {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -358,7 +358,9 @@ public Response getFunctionInstanceStatus(final String tenant, final String
name
         FunctionStatus functionStatus = null;
         try {
             functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace,
functionName,
-                    Integer.parseInt(instanceId));
+                    Integer.parseInt(instanceId), uri);
+        } catch (WebApplicationException we) {
+            throw we;
         } catch (Exception e) {
             log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, functionName,
e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 95fe687804..6c6fb18613 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -104,7 +104,7 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String
tena
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId)
throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, functionName, instanceId);
+            tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @GET


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message