From commits-return-7503-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Tue May 1 18:35:17 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D7CBD180645 for ; Tue, 1 May 2018 18:35:16 +0200 (CEST) Received: (qmail 71478 invoked by uid 500); 1 May 2018 16:35:14 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 71469 invoked by uid 99); 1 May 2018 16:35:14 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 May 2018 16:35:14 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] merlimat closed pull request #1697: adding function worker initialized check Message-ID: <152519251432.19784.2800128820516211758.gitbox@gitbox.apache.org> Date: Tue, 01 May 2018 16:35:14 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit merlimat closed pull request #1697: adding function worker initialized check URL: https://github.com/apache/incubator-pulsar/pull/1697 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java index a0f747c6a7..96671ec3e3 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java @@ -153,7 +153,7 @@ public PulsarAdminException getApiException(Throwable e) { if (e.getCause() instanceof java.net.ConnectException) { return new ConnectException(e.getCause()); } else { - return new HttpErrorException(e); + return new PulsarAdminException((ServerErrorException) e); } } else if (e instanceof WebApplicationException) { // Handle 5xx exceptions diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 9d5b8eec5f..66da4978b8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -42,8 +42,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.nio.file.Path; -import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.List; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index e33dcbaa43..dfe5834e27 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -45,6 +45,7 @@ private Namespace dlogNamespace; private MembershipManager membershipManager; private SchedulerManager schedulerManager; + private boolean isInitialized = false; public WorkerService(WorkerConfig workerConfig) { this.workerConfig = workerConfig; @@ -117,6 +118,9 @@ public void start(URI dlogUri) throws InterruptedException { // Start function runtime manager this.functionRuntimeManager.start(); + // indicate function worker service is done intializing + this.isInitialized = true; + } catch (Exception e) { log.error("Error Starting up in worker", e); throw new RuntimeException(e); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index e95f620d9c..9c935d8ab1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -82,6 +82,17 @@ private WorkerService worker() { } } + private boolean isWorkerServiceAvailable() { + WorkerService workerService = workerServiceSupplier.get(); + if (workerService == null) { + return false; + } + if (!workerService.isInitialized()) { + return false; + } + return true; + } + @POST @Path("/{tenant}/{namespace}/{functionName}") @Consumes(MediaType.MULTIPART_FORM_DATA) @@ -91,6 +102,11 @@ public Response registerFunction(final @PathParam("tenant") String tenant, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, final @FormDataParam("functionDetails") String functionDetailsJson) { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + FunctionDetails functionDetails; // validate parameters try { @@ -141,6 +157,10 @@ public Response updateFunction(final @PathParam("tenant") String tenant, final @FormDataParam("data") FormDataContentDisposition fileDetail, final @FormDataParam("functionDetails") String functionDetailsJson) { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + FunctionDetails functionDetails; // validate parameters try { @@ -187,6 +207,10 @@ public Response deregisterFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + // validate parameters try { validateDeregisterRequestParams(tenant, namespace, functionName); @@ -244,6 +268,10 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant, final @PathParam("functionName") String functionName) throws IOException { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + // validate parameters try { validateGetFunctionRequestParams(tenant, namespace, functionName); @@ -276,6 +304,10 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tena final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) throws IOException { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + // validate parameters try { validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId); @@ -319,6 +351,10 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) throws IOException { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + // validate parameters try { validateGetFunctionRequestParams(tenant, namespace, functionName); @@ -360,6 +396,10 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant, public Response listFunctions(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace) { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + // validate parameters try { validateListFunctionRequestParams(tenant, namespace); @@ -429,6 +469,11 @@ private Response updateRequest(FunctionMetaData functionMetaData, @GET @Path("/cluster") public Response getCluster() { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + MembershipManager membershipManager = worker().getMembershipManager(); List members = membershipManager.getCurrentMembership(); return Response.status(Status.OK).entity(new Gson().toJson(members)).build(); @@ -437,6 +482,11 @@ public Response getCluster() { @GET @Path("/assignments") public Response getAssignments() { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); Map> assignments = functionRuntimeManager.getCurrentAssignments(); Map> ret = new HashMap<>(); @@ -456,6 +506,11 @@ public Response triggerFunction(final @PathParam("tenant") String tenant, final @FormDataParam("data") String input, final @FormDataParam("dataStream") InputStream uploadedInputStream, final @FormDataParam("topic") String topic) { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + FunctionDetails functionDetails; // validate parameters try { @@ -718,4 +773,12 @@ private void validateTriggerRequestParams(String tenant, } } + private Response getUnavailableResponse() { + return Response.status(Status.SERVICE_UNAVAILABLE) + .type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Function worker service is not done initializing. " + + "Please try again in a little while.")) + .build(); + } + } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 68be7641b1..b26b4f5992 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -118,6 +118,7 @@ public void setup() { this.mockedWorkerService = mock(WorkerService.class); when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager); when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); + when(mockedWorkerService.isInitialized()).thenReturn(true); // worker config WorkerConfig workerConfig = new WorkerConfig() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services