From commits-return-14352-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Thu Sep 13 20:29:41 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0127018067A for ; Thu, 13 Sep 2018 20:29:40 +0200 (CEST) Received: (qmail 83763 invoked by uid 500); 13 Sep 2018 18:29:40 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 83754 invoked by uid 99); 13 Sep 2018 18:29:40 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Sep 2018 18:29:40 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] sijie closed pull request #2516: Fix: get function status with auth enable Message-ID: <153686337953.16650.5799642279265157381.gitbox@gitbox.apache.org> Date: Thu, 13 Sep 2018 18:29:39 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit sijie closed pull request #2516: Fix: get function status with auth enable URL: https://github.com/apache/incubator-pulsar/pull/2516 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index def5452dbb..62d12ec9f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -160,7 +160,7 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tena final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( - tenant, namespace, functionName, instanceId); + tenant, namespace, functionName, instanceId, uri.getRequestUri()); } @GET diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 7dc7050ac5..1d315d78f0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -110,9 +110,8 @@ public FunctionStatusList getFunctionStatus( } } - @Override - public FunctionStatus getFunctionStatus(String tenant, String namespace, String function, int id) - throws PulsarAdminException { + public FunctionStatus getFunctionStatus( + String tenant, String namespace, String function, int id) throws PulsarAdminException { try { Response response = request( functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status")) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 93828de40d..ee6eeecd50 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; @@ -248,7 +249,7 @@ public synchronized void removeAssignments(Collection assignments) { * @return the function status */ public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace, - String functionName, int instanceId) { + String functionName, int instanceId, URI uri) { Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId); final String assignedWorkerId = assignment.getWorkerId(); final String workerId = this.workerConfig.getWorkerId(); @@ -306,23 +307,8 @@ public synchronized void removeAssignments(Collection assignments) { return functionStatusBuilder.build(); } - Client client = ClientBuilder.newClient(); - - // TODO: implement authentication/authorization - String jsonResponse = client.target(String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/status", - workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId)) - .request(MediaType.TEXT_PLAIN) - .get(String.class); - - InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); - try { - org.apache.pulsar.functions.utils.Utils.mergeJson(jsonResponse, functionStatusBuilder); - } catch (IOException e) { - log.warn("Got invalid function status response from {}", workerInfo, e); - throw new RuntimeException(e); - } - functionStatusBuilder.setWorkerId(assignedWorkerId); - functionStatus = functionStatusBuilder.build(); + URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(); + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); } return functionStatus; @@ -426,9 +412,10 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro * @param namespace the namespace the function belongs to * @param functionName the function name * @return a list of function statuses + * @throws PulsarAdminException */ public InstanceCommunication.FunctionStatusList getAllFunctionStatus( - String tenant, String namespace, String functionName) { + String tenant, String namespace, String functionName) throws PulsarAdminException { Collection assignments = this.findFunctionAssignments(tenant, namespace, functionName); @@ -438,13 +425,15 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro } for (Assignment assignment : assignments) { - - InstanceCommunication.FunctionStatus functionStatus = this.getFunctionInstanceStatus( - assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), - assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), - assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), - assignment.getInstance().getInstanceId()); - + boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId()); + InstanceCommunication.FunctionStatus functionStatus = isOwner + ? (getFunctionInstanceStatus(tenant, namespace, functionName, + assignment.getInstance().getInstanceId(), null)) + : this.functionAdmin.functions().getFunctionStatus( + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), + assignment.getInstance().getInstanceId()); functionStatusListBuilder.addFunctionStatusList(functionStatus); } return functionStatusListBuilder.build(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 136bab0af7..df82c0d25c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -332,7 +332,7 @@ public Response getFunctionInfo(final String tenant, final String namespace, fin } public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName, - final String instanceId) throws IOException { + final String instanceId, URI uri) throws IOException { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -358,7 +358,9 @@ public Response getFunctionInstanceStatus(final String tenant, final String name FunctionStatus functionStatus = null; try { functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName, - Integer.parseInt(instanceId)); + Integer.parseInt(instanceId), uri); + } catch (WebApplicationException we) { + throw we; } catch (Exception e) { log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, functionName, e); return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 95fe687804..6c6fb18613 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -104,7 +104,7 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tena final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( - tenant, namespace, functionName, instanceId); + tenant, namespace, functionName, instanceId, uri.getRequestUri()); } @GET ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services