pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #1697: adding function worker initialized check
Date Tue, 01 May 2018 16:35:14 GMT
merlimat closed pull request #1697: adding function worker initialized check
URL: https://github.com/apache/incubator-pulsar/pull/1697
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index a0f747c6a7..96671ec3e3 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -153,7 +153,7 @@ public PulsarAdminException getApiException(Throwable e) {
             if (e.getCause() instanceof java.net.ConnectException) {
                 return new ConnectException(e.getCause());
             } else {
-                return new HttpErrorException(e);
+                return new PulsarAdminException((ServerErrorException) e);
             }
         } else if (e instanceof WebApplicationException) {
             // Handle 5xx exceptions
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 9d5b8eec5f..66da4978b8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -42,8 +42,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.List;
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index e33dcbaa43..dfe5834e27 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -45,6 +45,7 @@
     private Namespace dlogNamespace;
     private MembershipManager membershipManager;
     private SchedulerManager schedulerManager;
+    private boolean isInitialized = false;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
@@ -117,6 +118,9 @@ public void start(URI dlogUri) throws InterruptedException {
             // Start function runtime manager
             this.functionRuntimeManager.start();
 
+            // indicate function worker service is done intializing
+            this.isInitialized = true;
+
         } catch (Exception e) {
             log.error("Error Starting up in worker", e);
             throw new RuntimeException(e);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index e95f620d9c..9c935d8ab1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -82,6 +82,17 @@ private WorkerService worker() {
         }
     }
 
+    private boolean isWorkerServiceAvailable() {
+        WorkerService workerService = workerServiceSupplier.get();
+        if (workerService == null) {
+            return false;
+        }
+        if (!workerService.isInitialized()) {
+            return false;
+        }
+        return true;
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
@@ -91,6 +102,11 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
                                      final @FormDataParam("data") InputStream uploadedInputStream,
                                      final @FormDataParam("data") FormDataContentDisposition
fileDetail,
                                      final @FormDataParam("functionDetails") String functionDetailsJson)
{
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         FunctionDetails functionDetails;
         // validate parameters
         try {
@@ -141,6 +157,10 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("data") FormDataContentDisposition
fileDetail,
                                    final @FormDataParam("functionDetails") String functionDetailsJson)
{
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         FunctionDetails functionDetails;
         // validate parameters
         try {
@@ -187,6 +207,10 @@ public Response deregisterFunction(final @PathParam("tenant") String
tenant,
                                        final @PathParam("namespace") String namespace,
                                        final @PathParam("functionName") String functionName)
{
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         // validate parameters
         try {
             validateDeregisterRequestParams(tenant, namespace, functionName);
@@ -244,6 +268,10 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant,
                                     final @PathParam("functionName") String functionName)
             throws IOException {
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         // validate parameters
         try {
             validateGetFunctionRequestParams(tenant, namespace, functionName);
@@ -276,6 +304,10 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant")
String tena
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId)
throws IOException {
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         // validate parameters
         try {
             validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
@@ -319,6 +351,10 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName)
throws IOException {
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         // validate parameters
         try {
             validateGetFunctionRequestParams(tenant, namespace, functionName);
@@ -360,6 +396,10 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
     public Response listFunctions(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace) {
 
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         // validate parameters
         try {
             validateListFunctionRequestParams(tenant, namespace);
@@ -429,6 +469,11 @@ private Response updateRequest(FunctionMetaData functionMetaData,
     @GET
     @Path("/cluster")
     public Response getCluster() {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         MembershipManager membershipManager = worker().getMembershipManager();
         List<MembershipManager.WorkerInfo> members = membershipManager.getCurrentMembership();
         return Response.status(Status.OK).entity(new Gson().toJson(members)).build();
@@ -437,6 +482,11 @@ public Response getCluster() {
     @GET
     @Path("/assignments")
     public Response getAssignments() {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         Map<String, Map<String, Function.Assignment>> assignments = functionRuntimeManager.getCurrentAssignments();
         Map<String, Collection<String>> ret = new HashMap<>();
@@ -456,6 +506,11 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
                                     final @FormDataParam("data") String input,
                                     final @FormDataParam("dataStream") InputStream uploadedInputStream,
                                     final @FormDataParam("topic") String topic) {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
         FunctionDetails functionDetails;
         // validate parameters
         try {
@@ -718,4 +773,12 @@ private void validateTriggerRequestParams(String tenant,
         }
     }
 
+    private Response getUnavailableResponse() {
+        return Response.status(Status.SERVICE_UNAVAILABLE)
+                .type(MediaType.APPLICATION_JSON)
+                .entity(new ErrorData("Function worker service is not done initializing.
"
+                        + "Please try again in a little while."))
+                .build();
+    }
+
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 68be7641b1..b26b4f5992 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -118,6 +118,7 @@ public void setup() {
         this.mockedWorkerService = mock(WorkerService.class);
         when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
         when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
+        when(mockedWorkerService.isInitialized()).thenReturn(true);
 
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message