From commits-return-7504-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Tue May 1 18:35:17 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id F33B5180675 for ; Tue, 1 May 2018 18:35:16 +0200 (CEST) Received: (qmail 71494 invoked by uid 500); 1 May 2018 16:35:16 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 71483 invoked by uid 99); 1 May 2018 16:35:15 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 May 2018 16:35:15 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id A6DFE80612; Tue, 1 May 2018 16:35:14 +0000 (UTC) Date: Tue, 01 May 2018 16:35:14 +0000 To: "commits@pulsar.apache.org" Subject: [incubator-pulsar] branch master updated: adding function worker initialized check (#1697) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152519251457.19800.14205260942467219545@gitbox.apache.org> From: mmerli@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pulsar X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 2dd158e9514b89ec545169f4fd8fa863f9c099de X-Git-Newrev: 1c4b1c4b9927ccb3d3f45feff97c5c058f46fe1c X-Git-Rev: 1c4b1c4b9927ccb3d3f45feff97c5c058f46fe1c X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1c4b1c4 adding function worker initialized check (#1697) 1c4b1c4 is described below commit 1c4b1c4b9927ccb3d3f45feff97c5c058f46fe1c Author: Boyang Jerry Peng AuthorDate: Tue May 1 09:35:12 2018 -0700 adding function worker initialized check (#1697) * adding function worker intialized check * removing newline * refactoring duplicate code * fixing unit tests --- .../pulsar/client/admin/internal/BaseResource.java | 2 +- .../client/admin/internal/FunctionsImpl.java | 2 - .../pulsar/functions/worker/WorkerService.java | 4 ++ .../functions/worker/rest/api/FunctionsImpl.java | 63 ++++++++++++++++++++++ .../rest/api/v2/FunctionApiV2ResourceTest.java | 1 + 5 files changed, 69 insertions(+), 3 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java index a0f747c..96671ec 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java @@ -153,7 +153,7 @@ public abstract class BaseResource { if (e.getCause() instanceof java.net.ConnectException) { return new ConnectException(e.getCause()); } else { - return new HttpErrorException(e); + return new PulsarAdminException((ServerErrorException) e); } } else if (e instanceof WebApplicationException) { // Handle 5xx exceptions diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 9d5b8ee..66da497 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -42,8 +42,6 @@ import javax.ws.rs.core.Response; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.nio.file.Path; -import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.List; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index e33dcba..dfe5834 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -45,6 +45,7 @@ public class WorkerService { private Namespace dlogNamespace; private MembershipManager membershipManager; private SchedulerManager schedulerManager; + private boolean isInitialized = false; public WorkerService(WorkerConfig workerConfig) { this.workerConfig = workerConfig; @@ -117,6 +118,9 @@ public class WorkerService { // Start function runtime manager this.functionRuntimeManager.start(); + // indicate function worker service is done intializing + this.isInitialized = true; + } catch (Exception e) { log.error("Error Starting up in worker", e); throw new RuntimeException(e); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 49c652f..5261750 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -82,6 +82,17 @@ public class FunctionsImpl { } } + private boolean isWorkerServiceAvailable() { + WorkerService workerService = workerServiceSupplier.get(); + if (workerService == null) { + return false; + } + if (!workerService.isInitialized()) { + return false; + } + return true; + } + @POST @Path("/{tenant}/{namespace}/{functionName}") @Consumes(MediaType.MULTIPART_FORM_DATA) @@ -91,6 +102,11 @@ public class FunctionsImpl { final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, final @FormDataParam("functionDetails") String functionDetailsJson) { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + FunctionDetails functionDetails; // validate parameters try { @@ -141,6 +157,10 @@ public class FunctionsImpl { final @FormDataParam("data") FormDataContentDisposition fileDetail, final @FormDataParam("functionDetails") String functionDetailsJson) { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + FunctionDetails functionDetails; // validate parameters try { @@ -187,6 +207,10 @@ public class FunctionsImpl { final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + // validate parameters try { validateDeregisterRequestParams(tenant, namespace, functionName); @@ -244,6 +268,10 @@ public class FunctionsImpl { final @PathParam("functionName") String functionName) throws IOException { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + // validate parameters try { validateGetFunctionRequestParams(tenant, namespace, functionName); @@ -276,6 +304,10 @@ public class FunctionsImpl { final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) throws IOException { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + // validate parameters try { validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId); @@ -319,6 +351,10 @@ public class FunctionsImpl { final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) throws IOException { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + // validate parameters try { validateGetFunctionRequestParams(tenant, namespace, functionName); @@ -360,6 +396,10 @@ public class FunctionsImpl { public Response listFunctions(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace) { + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + // validate parameters try { validateListFunctionRequestParams(tenant, namespace); @@ -429,6 +469,11 @@ public class FunctionsImpl { @GET @Path("/cluster") public Response getCluster() { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + MembershipManager membershipManager = worker().getMembershipManager(); List members = membershipManager.getCurrentMembership(); return Response.status(Status.OK).entity(new Gson().toJson(members)).build(); @@ -437,6 +482,11 @@ public class FunctionsImpl { @GET @Path("/assignments") public Response getAssignments() { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); Map> assignments = functionRuntimeManager.getCurrentAssignments(); Map> ret = new HashMap<>(); @@ -456,6 +506,11 @@ public class FunctionsImpl { final @FormDataParam("data") String input, final @FormDataParam("dataStream") InputStream uploadedInputStream, final @FormDataParam("topic") String topic) { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + FunctionDetails functionDetails; // validate parameters try { @@ -718,4 +773,12 @@ public class FunctionsImpl { } } + private Response getUnavailableResponse() { + return Response.status(Status.SERVICE_UNAVAILABLE) + .type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Function worker service is not done initializing. " + + "Please try again in a little while.")) + .build(); + } + } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 31c928d..cddc152 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -119,6 +119,7 @@ public class FunctionApiV2ResourceTest { this.mockedWorkerService = mock(WorkerService.class); when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager); when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); + when(mockedWorkerService.isInitialized()).thenReturn(true); // worker config WorkerConfig workerConfig = new WorkerConfig() -- To stop receiving notification emails like this one, please contact mmerli@apache.org.