Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 18FFB200C11 for ; Fri, 20 Jan 2017 12:28:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 17D0A160B39; Fri, 20 Jan 2017 11:28:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 628FA160B48 for ; Fri, 20 Jan 2017 12:27:59 +0100 (CET) Received: (qmail 30634 invoked by uid 500); 20 Jan 2017 11:27:58 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 30625 invoked by uid 99); 20 Jan 2017 11:27:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2017 11:27:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 74E7EDFA6F; Fri, 20 Jan 2017 11:27:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.apache.org Date: Fri, 20 Jan 2017 11:27:59 -0000 Message-Id: <647ae48f0f8f4448834c5d1282b96860@git.apache.org> In-Reply-To: <2dafc27462b9422c9f0eaf51c583ab93@git.apache.org> References: <2dafc27462b9422c9f0eaf51c583ab93@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-5482] [queryable state] Re-issue location lookup upon failure archived-at: Fri, 20 Jan 2017 11:28:00 -0000 [FLINK-5482] [queryable state] Re-issue location lookup upon failure Any failing lookup, e.g. in case the job has not been started yet, previously remained in the lookup cache and thus future queries did not retry the lookup and failed. This commit changes the lookup caching code so that completed and failed futures are removed from the cache and replaced by new lookups. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1db81021 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1db81021 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1db81021 Branch: refs/heads/release-1.2 Commit: 1db8102184a30a3df5448189cbc0a99938b906ab Parents: da10a2e Author: Nico Kruber Authored: Thu Jan 12 16:48:27 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 12:27:47 2017 +0100 ---------------------------------------------------------------------- .../runtime/query/QueryableStateClient.java | 20 +++++- .../flink/test/query/QueryableStateITCase.java | 73 ++++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1db81021/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java index 98c3580..7ba3199 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java @@ -341,7 +341,25 @@ public class QueryableStateClient { return previous; } } else { - return cachedFuture; + // do not retain futures which failed as they will remain in + // the cache even if the error cause is not present any more + // and a new lookup may succeed + if (cachedFuture.isCompleted() && + cachedFuture.value().get().isFailure()) { + // issue a new lookup + Future lookupFuture = lookupService + .getKvStateLookupInfo(jobId, queryableStateName); + + // replace the existing one if it has not been replaced yet + // otherwise return the one in the cache + if (lookupCache.replace(cacheKey, cachedFuture, lookupFuture)) { + return lookupFuture; + } else { + return lookupCache.get(cacheKey); + } + } else { + return cachedFuture; + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/1db81021/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index eccd8e0..88e4f9a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -640,6 +640,79 @@ public class QueryableStateITCase extends TestLogger { } /** + * Similar tests as {@link #testValueState()} but before submitting the + * job, we already issue one request which fails. + */ + @Test + public void testQueryNonStartedJobState() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final int numElements = 1024; + + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(NUM_SLOTS); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); + + DataStream> source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor> valueState = new ValueStateDescriptor<>( + "any", + source.getType(), + null); + + QueryableStateStream> queryableState = + source.keyBy(new KeySelector, Integer>() { + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + // Now query + long expected = numElements; + + // query once + client.getKvState(jobId, queryableState.getQueryableStateName(), 0, + KvStateRequestSerializer.serializeKeyAndNamespace( + 0, + queryableState.getKeySerializer(), + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE)); + + cluster.submitJobDetached(jobGraph); + + executeValueQuery(deadline, client, jobId, queryableState, + expected); + } finally { + // Free cluster resources + if (jobId != null) { + Future cancellation = cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class)); + + Await.ready(cancellation, deadline.timeLeft()); + } + + client.shutDown(); + } + } + + /** * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until * expected equals the value of the result tuple's second field. */