flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/2] flink git commit: [FLINK-5482] [queryable state] Re-issue location lookup upon failure
Date Fri, 20 Jan 2017 11:27:59 GMT
[FLINK-5482] [queryable state] Re-issue location lookup upon failure

Any failing lookup, e.g. in case the job has not been started yet, previously
remained in the lookup cache and thus future queries did not retry the lookup
and failed. This commit changes the lookup caching code so that completed
and failed futures are removed from the cache and replaced by new lookups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1db81021
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1db81021
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1db81021

Branch: refs/heads/release-1.2
Commit: 1db8102184a30a3df5448189cbc0a99938b906ab
Parents: da10a2e
Author: Nico Kruber <nico@data-artisans.com>
Authored: Thu Jan 12 16:48:27 2017 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Fri Jan 20 12:27:47 2017 +0100

----------------------------------------------------------------------
 .../runtime/query/QueryableStateClient.java     | 20 +++++-
 .../flink/test/query/QueryableStateITCase.java  | 73 ++++++++++++++++++++
 2 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1db81021/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 98c3580..7ba3199 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -341,7 +341,25 @@ public class QueryableStateClient {
 					return previous;
 				}
 			} else {
-				return cachedFuture;
+				// do not retain futures which failed as they will remain in
+				// the cache even if the error cause is not present any more
+				// and a new lookup may succeed
+				if (cachedFuture.isCompleted() &&
+					cachedFuture.value().get().isFailure()) {
+					// issue a new lookup
+					Future<KvStateLocation> lookupFuture = lookupService
+						.getKvStateLookupInfo(jobId, queryableStateName);
+
+					// replace the existing one if it has not been replaced yet
+					// otherwise return the one in the cache
+					if (lookupCache.replace(cacheKey, cachedFuture, lookupFuture)) {
+						return lookupFuture;
+					} else {
+						return lookupCache.get(cacheKey);
+					}
+				} else {
+					return cachedFuture;
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1db81021/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index eccd8e0..88e4f9a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -640,6 +640,79 @@ public class QueryableStateITCase extends TestLogger {
 	}
 
 	/**
+	 * Similar tests as {@link #testValueState()} but before submitting the
+	 * job, we already issue one request which fails.
+	 */
+	@Test
+	public void testQueryNonStartedJobState() throws Exception {
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final int numElements = 1024;
+
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(NUM_SLOTS);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+
+			DataStream<Tuple2<Integer, Long>> source = env
+				.addSource(new TestAscendingValueSource(numElements));
+
+			// Value state
+			ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
+				"any",
+				source.getType(),
+				null);
+
+			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
+				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
+					@Override
+					public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
+						return value.f0;
+					}
+				}).asQueryableState("hakuna", valueState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			// Now query
+			long expected = numElements;
+
+			// query once
+			client.getKvState(jobId, queryableState.getQueryableStateName(), 0,
+				KvStateRequestSerializer.serializeKeyAndNamespace(
+					0,
+					queryableState.getKeySerializer(),
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE));
+
+			cluster.submitJobDetached(jobGraph);
+
+			executeValueQuery(deadline, client, jobId, queryableState,
+				expected);
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				Future<CancellationSuccess> cancellation = cluster
+					.getLeaderGateway(deadline.timeLeft())
+					.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+					.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+
+				Await.ready(cancellation, deadline.timeLeft());
+			}
+
+			client.shutDown();
+		}
+	}
+
+	/**
 	 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until
 	 * <tt>expected</tt> equals the value of the result tuple's second field.
 	 */


Mime
View raw message