flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #5339: [FLINK-8493] [flip6] Integrate queryable state wit...
Date Tue, 06 Feb 2018 08:50:37 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5339#discussion_r166221453
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
---
    @@ -205,32 +201,32 @@ private void executeActionAsync(
     			return cachedFuture;
     		}
     
    -		LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId,
queryableStateName);
    -
    -		final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
    -		lookupCache.put(cacheKey, location);
    -		return proxy.getJobManagerFuture().thenComposeAsync(
    -				jobManagerGateway -> {
    -					final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
    -					jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
    -							.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
    -							.onComplete(new OnComplete<KvStateLocation>() {
    -
    -								@Override
    -								public void onComplete(Throwable failure, KvStateLocation loc) throws Throwable
{
    -									if (failure != null) {
    -										if (failure instanceof FlinkJobNotFoundException) {
    -											// if the jobId was wrong, remove the entry from the cache.
    -											lookupCache.remove(cacheKey);
    -										}
    -										location.completeExceptionally(failure);
    -									} else {
    -										location.complete(loc);
    -									}
    -								}
    -							}, Executors.directExecutionContext());
    -					return location;
    -				}, queryExecutor);
    +		final KvStateLocationOracle kvStateLocationOracle = proxy.getKvStateLocationOracle(jobId);
    +
    +		if (kvStateLocationOracle != null) {
    +			LOG.debug("Retrieving location for state={} of job={} from the key-value state location
oracle.", jobId, queryableStateName);
    +			final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
    +			lookupCache.put(cacheKey, location);
    +
    +			kvStateLocationOracle
    +				.requestKvStateLocation(jobId, queryableStateName)
    +				.whenComplete(
    +					(KvStateLocation kvStateLocation, Throwable throwable) -> {
    +						if (throwable != null) {
    +							if (ExceptionUtils.stripCompletionException(throwable) instanceof FlinkJobNotFoundException)
{
    +								// if the jobId was wrong, remove the entry from the cache.
    +								lookupCache.remove(cacheKey);
    +							}
    +							location.completeExceptionally(throwable);
    +						} else {
    +							location.complete(kvStateLocation);
    +						}
    +					});
    +
    +			return location;
    +		} else {
    +			return FutureUtils.completedExceptionally(new UnknownJobManagerException());
    --- End diff --
    
    I'll rename it into `UnknownLocationException`. Since this exception is marked as internal
it should not be a problem.


---

Mime
View raw message