flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state
Date Wed, 15 Nov 2017 16:53:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16253781#comment-16253781
] 

ASF GitHub Bot commented on FLINK-8063:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5021#discussion_r151184658
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
---
    @@ -1372,84 +1492,60 @@ public String fold(String accumulator, Tuple2<Integer, Long>
value) throws Excep
     
     	/////				General Utility Methods				//////
     
    -	private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries(
    +	private static <K, S extends State, V> CompletableFuture<S> getKvState(
     			final QueryableStateClient client,
     			final JobID jobId,
     			final String queryName,
     			final K key,
     			final TypeInformation<K> keyTypeInfo,
     			final StateDescriptor<S, V> stateDescriptor,
    -			final Time retryDelay,
     			final boolean failForUnknownKeyOrNamespace,
    -			final ScheduledExecutor executor) {
    -		return retryWithDelay(
    -				() -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo,
VoidNamespaceTypeInfo.INSTANCE, stateDescriptor),
    -				NO_OF_RETRIES,
    -				retryDelay,
    -				executor,
    -				failForUnknownKeyOrNamespace);
    -	}
    -
    -	private static <T> CompletableFuture<T> retryWithDelay(
    -			final Supplier<CompletableFuture<T>> operation,
    -			final int retries,
    -			final Time retryDelay,
    -			final ScheduledExecutor scheduledExecutor,
    -			final boolean failIfUnknownKeyOrNamespace) {
    -
    -		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
    -
    -		retryWithDelay(
    -				resultFuture,
    -				operation,
    -				retries,
    -				retryDelay,
    -				scheduledExecutor,
    -				failIfUnknownKeyOrNamespace);
    +			final ScheduledExecutor executor) throws InterruptedException {
     
    +		final CompletableFuture<S> resultFuture = new CompletableFuture<>();
    +		getKvStateIgnoringCertainExceptions(
    +				resultFuture, client, jobId, queryName, key, keyTypeInfo,
    +				stateDescriptor, failForUnknownKeyOrNamespace, executor);
     		return resultFuture;
     	}
     
    -	public static <T> void retryWithDelay(
    -			final CompletableFuture<T> resultFuture,
    -			final Supplier<CompletableFuture<T>> operation,
    -			final int retries,
    -			final Time retryDelay,
    -			final ScheduledExecutor scheduledExecutor,
    -			final boolean failIfUnknownKeyOrNamespace) {
    +	private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
    +			final CompletableFuture<S> resultFuture,
    +			final QueryableStateClient client,
    +			final JobID jobId,
    +			final String queryName,
    +			final K key,
    +			final TypeInformation<K> keyTypeInfo,
    +			final StateDescriptor<S, V> stateDescriptor,
    +			final boolean failForUnknownKeyOrNamespace,
    +			final ScheduledExecutor executor) throws InterruptedException {
     
     		if (!resultFuture.isDone()) {
    -			final CompletableFuture<T> operationResultFuture = operation.get();
    -			operationResultFuture.whenCompleteAsync(
    -					(t, throwable) -> {
    -						if (throwable != null) {
    -							if (throwable.getCause() instanceof CancellationException) {
    -								resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation
future was cancelled.", throwable.getCause()));
    -							} else if (throwable.getCause() instanceof AssertionError ||
    -									(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException))
{
    -								resultFuture.completeExceptionally(throwable.getCause());
    -							} else {
    -								if (retries > 0) {
    -									final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
    -											() -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay,
scheduledExecutor, failIfUnknownKeyOrNamespace),
    -											retryDelay.toMilliseconds(),
    -											TimeUnit.MILLISECONDS);
    -
    -									resultFuture.whenComplete(
    -											(innerT, innerThrowable) -> scheduledFuture.cancel(false));
    -								} else {
    -									resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not
complete the operation. Number of retries " +
    -											"has been exhausted.", throwable));
    -								}
    -							}
    -						} else {
    -							resultFuture.complete(t);
    +			Thread.sleep(100L);
    +			CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE,
keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
    +			expected.whenCompleteAsync((result, throwable) -> {
    +				if (throwable != null) {
    +					if (
    +							throwable.getCause() instanceof CancellationException ||
    +							throwable.getCause() instanceof AssertionError ||
    +							(failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)
    +					) {
    +						resultFuture.completeExceptionally(throwable.getCause());
    +					} else {
    --- End diff --
    
    this branch may never exit and should use the `deadline` as an exit condition.


> Client blocks indefinitely when querying a non-existing state
> -------------------------------------------------------------
>
>                 Key: FLINK-8063
>                 URL: https://issues.apache.org/jira/browse/FLINK-8063
>             Project: Flink
>          Issue Type: Improvement
>          Components: Queryable State
>    Affects Versions: 1.4.0
>            Reporter: Chesnay Schepler
>            Assignee: Kostas Kloudas
>            Priority: Critical
>             Fix For: 1.4.0
>
>
> When querying for a non-existing state (as in, no state was registered under queryableStateName)
the client blocks indefinitely.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message