From issues-return-148649-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Jan 19 19:41:19 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id D1AD7180718 for ; Fri, 19 Jan 2018 19:41:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C1823160C1B; Fri, 19 Jan 2018 18:41:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1E388160C36 for ; Fri, 19 Jan 2018 19:41:18 +0100 (CET) Received: (qmail 64868 invoked by uid 500); 19 Jan 2018 18:41:18 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 64849 invoked by uid 99); 19 Jan 2018 18:41:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jan 2018 18:41:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id CCBAD1A0CA9 for ; Fri, 19 Jan 2018 18:41:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.23 X-Spam-Level: X-Spam-Status: No, score=-3.23 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id yt-PuKj-bzdZ for ; Fri, 19 Jan 2018 18:41:16 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id D01DC5F6C2 for ; Fri, 19 Jan 2018 18:41:15 +0000 (UTC) Received: (qmail 63918 invoked by uid 99); 19 Jan 2018 18:41:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jan 2018 18:41:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1D865E78B1; Fri, 19 Jan 2018 18:41:15 +0000 (UTC) From: tillrohrmann To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste... Content-Type: text/plain Message-Id: <20180119184115.1D865E78B1@git1-us-west.apache.org> Date: Fri, 19 Jan 2018 18:41:15 +0000 (UTC) Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r162696312 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() { public int getMaxSlots() { return 0; } + + //------------------------------------------------------------------------- + // RestClient Helper + //------------------------------------------------------------------------- + + private , U extends MessageParameters, P extends ResponseBody> CompletableFuture

+ sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); + } + + private , R extends RequestBody, P extends ResponseBody> CompletableFuture

+ sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); + } + + private , P extends ResponseBody> CompletableFuture

+ sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException { + return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture

+ sendRequest(M messageHeaders, U messageParameters, R request) throws IOException, LeaderNotAvailableException { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture

+ sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate retryPredicate) { + return retry(() -> { + final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress(); + return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(), messageHeaders, messageParameters, request); + }, retryPredicate); + } + + private CompletableFuture retry( + CheckedSupplier> operation, + Predicate retryPredicate) { + return FutureUtils.retryWithDelay( + CheckedSupplier.unchecked(operation), + MAX_RETRIES, + RETRY_DELAY, + retryPredicate, + new ScheduledExecutorServiceAdapter(retryExecutorService)); + } + + private static Predicate isTimeoutException() { + return (throwable) -> + ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || + ExceptionUtils.findThrowable(throwable, IOException.class).isPresent(); + } + + private static Predicate isHttpStatusUnsuccessfulException() { + return (throwable) -> ExceptionUtils.findThrowable(throwable, RestClientException.class) + .map(restClientException -> { + final int code = restClientException.getHttpResponseStatus().code(); + return code < 200 || code > 299; + }) + .orElse(false); + } + + private abstract class RestClusterClientLeaderRetrievalListener implements LeaderRetrievalListener { + @Override + public final void handleError(final Exception exception) { + log.error("Exception in LeaderRetrievalListener", exception); + shutdown(); --- End diff -- Maybe we could store the latest created future in `waitForResource` which we complete exceptionally here. ---