pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #3029: [state] make setting and opening state table more robust
Date Sun, 25 Nov 2018 19:42:02 GMT
sijie closed pull request #3029: [state] make setting and opening state table more robust
URL: https://github.com/apache/pulsar/pull/3029
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 70ed588e7e..91363046bd 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,10 +19,12 @@
 
 package org.apache.pulsar.functions.instance;
 
+import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
+import java.util.concurrent.TimeUnit;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -32,8 +34,15 @@
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.exceptions.ClientException;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
 import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
 import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.Backoff.Jitter;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.common.util.Retries;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.StorageType;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
@@ -314,7 +323,8 @@ private void createStateTable(String tableNs, String tableName, StorageClientSet
                 .setMinNumRanges(4)
                 .setStorageType(StorageType.TABLE)
                 .build();
-            while (true) {
+            Stopwatch elapsedWatch = Stopwatch.createStarted();
+            while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) {
                 try {
                     result(storageAdminClient.getStream(tableNs, tableName));
                     return;
@@ -335,6 +345,10 @@ private void createStateTable(String tableNs, String tableName, StorageClientSet
                         // there might be two client conflicting at creating table, so let's
retrieve it to make
                         // sure the table is created.
                     }
+                } catch (ClientException ce) {
+                    log.warn("Encountered issue on fetching state stable metadata, re-attempting
in 100 milliseconds",
+                        ce.getMessage());
+                    TimeUnit.MILLISECONDS.sleep(100);
                 }
             }
         }
@@ -354,6 +368,13 @@ private void setupStateTable() throws Exception {
         StorageClientSettings settings = StorageClientSettings.newBuilder()
                 .serviceUri(stateStorageServiceUrl)
                 .clientName("function-" + tableNs + "/" + tableName)
+                // configure a maximum 2 minutes jitter backoff for accessing table service
+                .backoffPolicy(Jitter.of(
+                    Type.EXPONENTIAL,
+                    100,
+                    2000,
+                    60
+                ))
                 .build();
 
         // we defer creation of the state table until a java instance is running here.
@@ -364,7 +385,19 @@ private void setupStateTable() throws Exception {
                 .withSettings(settings)
                 .withNamespace(tableNs)
                 .build();
-        this.stateTable = result(storageClient.openTable(tableName));
+        // NOTE: this is a workaround until we bump bk version to 4.9.0
+        // table might just be created above, so it might not be ready for serving traffic
+        Stopwatch openSw = Stopwatch.createStarted();
+        while (openSw.elapsed(TimeUnit.MINUTES) < 1) {
+            try {
+                this.stateTable = result(storageClient.openTable(tableName));
+                break;
+            } catch (InternalServerException ise) {
+                log.warn("Encountered internal server on opening table '{}', re-attempt in
100 milliseconds : {}",
+                    tableName, ise.getMessage());
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        }
     }
 
     private void processResult(Record srcRecord,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message