flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/2] flink git commit: [FLINK-5482] [tests] Dedup code in QueryableStateITCase
Date Fri, 20 Jan 2017 11:27:58 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 9073c53f9 -> 1db810218


[FLINK-5482] [tests] Dedup code in QueryableStateITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da10a2e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da10a2e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da10a2e9

Branch: refs/heads/release-1.2
Commit: da10a2e9fccb19f1bec626b9907de4e3a93be76d
Parents: 9073c53
Author: Nico Kruber <nico@data-artisans.com>
Authored: Thu Jan 12 16:41:30 2017 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Fri Jan 20 12:27:43 2017 +0100

----------------------------------------------------------------------
 .../flink/test/query/QueryableStateITCase.java  | 152 ++++++-------------
 1 file changed, 50 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da10a2e9/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index a5ed6ad..eccd8e0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -622,40 +622,8 @@ public class QueryableStateITCase extends TestLogger {
 			// Now query
 			long expected = numElements;
 
-			FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
-			for (int key = 0; key < NUM_SLOTS; key++) {
-				final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-						key,
-						queryableState.getKeySerializer(),
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE);
-
-				boolean success = false;
-				while (deadline.hasTimeLeft() && !success) {
-					Future<byte[]> future = getKvStateWithRetries(client,
-							jobId,
-							queryableState.getQueryableStateName(),
-							key,
-							serializedKey,
-							retryDelay);
-
-					byte[] serializedValue = Await.result(future, deadline.timeLeft());
-
-					Tuple2<Integer, Long> value = KvStateRequestSerializer.deserializeValue(
-							serializedValue,
-							queryableState.getValueSerializer());
-
-					assertEquals("Key mismatch", key, value.f0.intValue());
-					if (expected == value.f1) {
-						success = true;
-					} else {
-						// Retry
-						Thread.sleep(50);
-					}
-				}
-
-				assertTrue("Did not succeed query", success);
-			}
+			executeValueQuery(deadline, client, jobId, queryableState,
+				expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -672,6 +640,50 @@ public class QueryableStateITCase extends TestLogger {
 	}
 
 	/**
+	 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until
+	 * <tt>expected</tt> equals the value of the result tuple's second field.
+	 */
+	private void executeValueQuery(final Deadline deadline,
+		final QueryableStateClient client, final JobID jobId,
+		final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState,
+		final long expected) throws Exception {
+		FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
+		for (int key = 0; key < NUM_SLOTS; key++) {
+			final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
+				key,
+				queryableState.getKeySerializer(),
+				VoidNamespace.INSTANCE,
+				VoidNamespaceSerializer.INSTANCE);
+
+			boolean success = false;
+			while (deadline.hasTimeLeft() && !success) {
+				Future<byte[]> future = getKvStateWithRetries(client,
+					jobId,
+					queryableState.getQueryableStateName(),
+					key,
+					serializedKey,
+					retryDelay);
+
+				byte[] serializedValue = Await.result(future, deadline.timeLeft());
+
+				Tuple2<Integer, Long> value = KvStateRequestSerializer.deserializeValue(
+					serializedValue,
+					queryableState.getValueSerializer());
+
+				assertEquals("Key mismatch", key, value.f0.intValue());
+				if (expected == value.f1) {
+					success = true;
+				} else {
+					// Retry
+					Thread.sleep(50);
+				}
+			}
+
+			assertTrue("Did not succeed query", success);
+		}
+	}
+
+	/**
 	 * Tests simple value state queryable state instance. Each source emits
 	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
 	 * queried. The tests succeeds after each subtask index is queried with
@@ -718,40 +730,8 @@ public class QueryableStateITCase extends TestLogger {
 			// Now query
 			long expected = numElements;
 
-			FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
-			for (int key = 0; key < NUM_SLOTS; key++) {
-				final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-						key,
-						queryableState.getKeySerializer(),
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE);
-
-				boolean success = false;
-				while (deadline.hasTimeLeft() && !success) {
-					Future<byte[]> future = getKvStateWithRetries(client,
-							jobId,
-							queryableState.getQueryableStateName(),
-							key,
-							serializedKey,
-							retryDelay);
-
-					byte[] serializedValue = Await.result(future, deadline.timeLeft());
-
-					Tuple2<Integer, Long> value = KvStateRequestSerializer.deserializeValue(
-							serializedValue,
-							queryableState.getValueSerializer());
-
-					assertEquals("Key mismatch", key, value.f0.intValue());
-					if (expected == value.f1) {
-						success = true;
-					} else {
-						// Retry
-						Thread.sleep(50);
-					}
-				}
-
-				assertTrue("Did not succeed query", success);
-			}
+			executeValueQuery(deadline, client, jobId, queryableState,
+				expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {
@@ -1024,40 +1004,8 @@ public class QueryableStateITCase extends TestLogger {
 			// Now query
 			long expected = numElements * (numElements + 1) / 2;
 
-			FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
-			for (int key = 0; key < NUM_SLOTS; key++) {
-				final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-						key,
-						queryableState.getKeySerializer(),
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE);
-
-				boolean success = false;
-				while (deadline.hasTimeLeft() && !success) {
-					Future<byte[]> future = getKvStateWithRetries(client,
-							jobId,
-							queryableState.getQueryableStateName(),
-							key,
-							serializedKey,
-							retryDelay);
-
-					byte[] serializedValue = Await.result(future, deadline.timeLeft());
-
-					Tuple2<Integer, Long> value = KvStateRequestSerializer.deserializeValue(
-							serializedValue,
-							queryableState.getValueSerializer());
-
-					assertEquals("Key mismatch", key, value.f0.intValue());
-					if (expected == value.f1) {
-						success = true;
-					} else {
-						// Retry
-						Thread.sleep(50);
-					}
-				}
-
-				assertTrue("Did not succeed query", success);
-			}
+			executeValueQuery(deadline, client, jobId, queryableState,
+				expected);
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {


Mime
View raw message