flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
Date Tue, 23 Jan 2018 07:27:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335446#comment-16335446
] 

ASF GitHub Bot commented on FLINK-8344:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5312#discussion_r163160938
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
    @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() {
     	public int getMaxSlots() {
     		return 0;
     	}
    +
    +	//-------------------------------------------------------------------------
    +	// RestClient Helper
    +	//-------------------------------------------------------------------------
    +
    +	private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters,
P extends ResponseBody> CompletableFuture<P>
    +			sendRequest(M messageHeaders, U messageParameters) throws IOException, LeaderNotAvailableException
{
    +		return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance());
    +	}
    +
    +	private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends
RequestBody, P extends ResponseBody> CompletableFuture<P>
    +			sendRequest(M messageHeaders, R request) throws IOException, LeaderNotAvailableException
{
    +		return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request);
    +	}
    +
    +	private <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>,
P extends ResponseBody> CompletableFuture<P>
    +			sendRequest(M messageHeaders) throws IOException, LeaderNotAvailableException {
    +		return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
    +	}
    +
    +	private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R
extends RequestBody, P extends ResponseBody> CompletableFuture<P>
    +			sendRequest(M messageHeaders, U messageParameters, R request) throws IOException,
LeaderNotAvailableException {
    +		final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress();
    +		return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(),
messageHeaders, messageParameters, request);
    +	}
    +
    +	private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R
extends RequestBody, P extends ResponseBody> CompletableFuture<P>
    +			sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable>
retryPredicate) {
    +		return retry(() -> {
    +			final URL restServerBaseUrl = restServerLeaderHolder.getLeaderAddress();
    +			return restClient.sendRequest(restServerBaseUrl.getHost(), restServerBaseUrl.getPort(),
messageHeaders, messageParameters, request);
    +		}, retryPredicate);
    +	}
    +
    +	private <C> CompletableFuture<C> retry(
    +			CheckedSupplier<CompletableFuture<C>> operation,
    +			Predicate<Throwable> retryPredicate) {
    +		return FutureUtils.retryWithDelay(
    +			CheckedSupplier.unchecked(operation),
    +			MAX_RETRIES,
    +			RETRY_DELAY,
    +			retryPredicate,
    +			new ScheduledExecutorServiceAdapter(retryExecutorService));
    +	}
    +
    +	private static Predicate<Throwable> isTimeoutException() {
    +		return (throwable) ->
    +			ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent()
||
    +				ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent()
||
    +				ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent()
||
    +				ExceptionUtils.findThrowable(throwable, IOException.class).isPresent();
    +	}
    +
    +	private static Predicate<Throwable> isHttpStatusUnsuccessfulException() {
    +		return (throwable) -> ExceptionUtils.findThrowable(throwable, RestClientException.class)
    +				.map(restClientException -> {
    +					final int code = restClientException.getHttpResponseStatus().code();
    +					return code < 200 || code > 299;
    +				})
    +				.orElse(false);
    +	}
    +
    +	private abstract class RestClusterClientLeaderRetrievalListener implements LeaderRetrievalListener
{
    +		@Override
    +		public final void handleError(final Exception exception) {
    +			log.error("Exception in LeaderRetrievalListener", exception);
    +			shutdown();
    --- End diff --
    
    Using `LeaderRetriever` now


> Add support for HA to RestClusterClient
> ---------------------------------------
>
>                 Key: FLINK-8344
>                 URL: https://issues.apache.org/jira/browse/FLINK-8344
>             Project: Flink
>          Issue Type: Improvement
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Gary Yao
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in case of HA.
We have to add functionality to reconnect to a newly elected leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message