From commits-return-18079-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Sun Nov 25 20:42:04 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1A638180677 for ; Sun, 25 Nov 2018 20:42:03 +0100 (CET) Received: (qmail 48973 invoked by uid 500); 25 Nov 2018 19:42:03 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 48963 invoked by uid 99); 25 Nov 2018 19:42:03 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 25 Nov 2018 19:42:03 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] sijie closed pull request #3029: [state] make setting and opening state table more robust Message-ID: <154317492271.31549.3067886557604960501.gitbox@gitbox.apache.org> Date: Sun, 25 Nov 2018 19:42:02 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit sijie closed pull request #3029: [state] make setting and opening state table more robust URL: https://github.com/apache/pulsar/pull/3029 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 70ed588e7e..91363046bd 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -19,10 +19,12 @@ package org.apache.pulsar.functions.instance; +import com.google.common.base.Stopwatch; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; +import java.util.concurrent.TimeUnit; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -32,8 +34,15 @@ import org.apache.bookkeeper.clients.StorageClientBuilder; import org.apache.bookkeeper.clients.admin.StorageAdminClient; import org.apache.bookkeeper.clients.config.StorageClientSettings; +import org.apache.bookkeeper.clients.exceptions.ClientException; +import org.apache.bookkeeper.clients.exceptions.InternalServerException; import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException; import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException; +import org.apache.bookkeeper.common.util.Backoff; +import org.apache.bookkeeper.common.util.Backoff.Jitter; +import org.apache.bookkeeper.common.util.Backoff.Jitter.Type; +import org.apache.bookkeeper.common.util.Backoff.Policy; +import org.apache.bookkeeper.common.util.Retries; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; import org.apache.bookkeeper.stream.proto.StorageType; import org.apache.bookkeeper.stream.proto.StreamConfiguration; @@ -314,7 +323,8 @@ private void createStateTable(String tableNs, String tableName, StorageClientSet .setMinNumRanges(4) .setStorageType(StorageType.TABLE) .build(); - while (true) { + Stopwatch elapsedWatch = Stopwatch.createStarted(); + while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) { try { result(storageAdminClient.getStream(tableNs, tableName)); return; @@ -335,6 +345,10 @@ private void createStateTable(String tableNs, String tableName, StorageClientSet // there might be two client conflicting at creating table, so let's retrieve it to make // sure the table is created. } + } catch (ClientException ce) { + log.warn("Encountered issue on fetching state stable metadata, re-attempting in 100 milliseconds", + ce.getMessage()); + TimeUnit.MILLISECONDS.sleep(100); } } } @@ -354,6 +368,13 @@ private void setupStateTable() throws Exception { StorageClientSettings settings = StorageClientSettings.newBuilder() .serviceUri(stateStorageServiceUrl) .clientName("function-" + tableNs + "/" + tableName) + // configure a maximum 2 minutes jitter backoff for accessing table service + .backoffPolicy(Jitter.of( + Type.EXPONENTIAL, + 100, + 2000, + 60 + )) .build(); // we defer creation of the state table until a java instance is running here. @@ -364,7 +385,19 @@ private void setupStateTable() throws Exception { .withSettings(settings) .withNamespace(tableNs) .build(); - this.stateTable = result(storageClient.openTable(tableName)); + // NOTE: this is a workaround until we bump bk version to 4.9.0 + // table might just be created above, so it might not be ready for serving traffic + Stopwatch openSw = Stopwatch.createStarted(); + while (openSw.elapsed(TimeUnit.MINUTES) < 1) { + try { + this.stateTable = result(storageClient.openTable(tableName)); + break; + } catch (InternalServerException ise) { + log.warn("Encountered internal server on opening table '{}', re-attempt in 100 milliseconds : {}", + tableName, ise.getMessage()); + TimeUnit.MILLISECONDS.sleep(100); + } + } } private void processResult(Record srcRecord, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services