flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [26/27] flink git commit: [FLINK-3755] Fix variety of test problems cause by Keyed-State Refactoring
Date Wed, 31 Aug 2016 17:28:44 GMT
[FLINK-3755] Fix variety of test problems cause by Keyed-State Refactoring


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f44b57cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f44b57cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f44b57cc

Branch: refs/heads/master
Commit: f44b57ccf8f088f2ad4c1f10f479ed62be17eb8b
Parents: 6d43061
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Mon Aug 29 16:10:15 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Aug 31 19:10:02 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         | 58 ++++++++++++--------
 .../flink/cep/operator/CEPOperatorTest.java     |  6 ++
 .../flink/runtime/executiongraph/Execution.java |  4 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 18 ------
 .../runtime/state/StateBackendTestBase.java     | 20 +++----
 .../streaming/runtime/tasks/StreamTask.java     |  8 ++-
 .../api/graph/StreamGraphGeneratorTest.java     |  5 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |  7 +++
 .../api/scala/StreamingOperatorsITCase.scala    |  1 +
 .../WindowCheckpointingITCase.java              | 14 +++--
 .../test/streaming/runtime/IterateITCase.java   |  1 +
 .../translation/CustomPartitioningTest.scala    |  5 +-
 .../DeltaIterationTranslationTest.scala         |  1 +
 13 files changed, 83 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 624905c..d5b9b46 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -69,6 +69,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 
@@ -173,10 +175,10 @@ public class RocksDBAsyncSnapshotTest {
 			}
 		}
 
-		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-
 		task.triggerCheckpoint(42, 17);
 
+		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
+
 		// now we allow the checkpoint
 		delayCheckpointLatch.trigger();
 
@@ -184,7 +186,13 @@ public class RocksDBAsyncSnapshotTest {
 		ensureCheckpointLatch.await();
 
 		testHarness.endInput();
+
+		ExecutorService threadPool = task.getAsyncOperationsThreadPool();
+		threadPool.shutdown();
+		Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
+
 		testHarness.waitForTaskCompletion();
+		task.checkTimerException();
 	}
 
 	/**
@@ -199,9 +207,6 @@ public class RocksDBAsyncSnapshotTest {
 
 		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
 
-		//ensure that the async threads complete before invoke method of the tasks returns.
-		task.setThreadPoolTerminationTimeout(Long.MAX_VALUE);
-
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
@@ -232,6 +237,9 @@ public class RocksDBAsyncSnapshotTest {
 				new MockInputSplitProvider(),
 				testHarness.bufferSize);
 
+		BlockingStreamMemoryStateBackend.waitFirstWriteLatch = new OneShotLatch();
+		BlockingStreamMemoryStateBackend.unblockCancelLatch = new OneShotLatch();
+
 		testHarness.invoke(mockEnv);
 
 		// wait for the task to be running
@@ -241,36 +249,40 @@ public class RocksDBAsyncSnapshotTest {
 				while (!field.getBoolean(task)) {
 					Thread.sleep(10);
 				}
-
 			}
 		}
 
-		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-
 		task.triggerCheckpoint(42, 17);
-
+		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 		BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
 		task.cancel();
-
 		BlockingStreamMemoryStateBackend.unblockCancelLatch.trigger();
-
 		testHarness.endInput();
 		try {
+
+			ExecutorService threadPool = task.getAsyncOperationsThreadPool();
+			threadPool.shutdown();
+			Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
 			testHarness.waitForTaskCompletion();
+			task.checkTimerException();
+
 			Assert.fail("Operation completed. Cancel failed.");
 		} catch (Exception expected) {
-			// we expect the exception from canceling snapshots
-			Throwable cause = expected.getCause();
-			if(cause instanceof AsynchronousException) {
-				AsynchronousException asynchronousException = (AsynchronousException) cause;
-				cause = asynchronousException.getCause();
-				Assert.assertTrue("Unexpected Exception: " + cause,
-						cause instanceof CancellationException //future canceled
-						|| cause instanceof InterruptedException); //thread interrupted
+			AsynchronousException asynchronousException = null;
 
+			if (expected instanceof AsynchronousException) {
+				asynchronousException = (AsynchronousException) expected;
+			} else if (expected.getCause() instanceof AsynchronousException) {
+				asynchronousException = (AsynchronousException) expected.getCause();
 			} else {
-				Assert.fail();
+				Assert.fail("Unexpected exception: " + expected);
 			}
+
+			// we expect the exception from canceling snapshots
+			Throwable innerCause = asynchronousException.getCause();
+			Assert.assertTrue("Unexpected inner cause: " + innerCause,
+					innerCause instanceof CancellationException //future canceled
+							|| innerCause instanceof InterruptedException); //thread interrupted
 		}
 	}
 
@@ -301,11 +313,11 @@ public class RocksDBAsyncSnapshotTest {
 	 */
 	static class BlockingStreamMemoryStateBackend extends MemoryStateBackend {
 
-		public static OneShotLatch waitFirstWriteLatch = new OneShotLatch();
+		public static volatile OneShotLatch waitFirstWriteLatch = null;
 
-		public static OneShotLatch unblockCancelLatch = new OneShotLatch();
+		public static volatile OneShotLatch unblockCancelLatch = null;
 
-		volatile boolean closed = false;
+		private volatile boolean closed = false;
 
 		@Override
 		public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier)
throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 52a02d1..1fd8de8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -136,6 +136,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		// simulate snapshot/restore with some elements in internal sorting queue
 		StreamStateHandle snapshot = harness.snapshot(0, 0);
+		harness.close();
 
 		harness = new OneInputStreamOperatorTestHarness<>(
 				new CEPPatternOperator<>(
@@ -157,6 +158,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		// simulate snapshot/restore with empty element queue but NFA state
 		StreamStateHandle snapshot2 = harness.snapshot(1, 1);
+		harness.close();
 
 		harness = new OneInputStreamOperatorTestHarness<>(
 				new CEPPatternOperator<>(
@@ -227,6 +229,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		// simulate snapshot/restore with some elements in internal sorting queue
 		StreamStateHandle snapshot = harness.snapshot(0, 0);
+		harness.close();
 
 		harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
@@ -252,6 +255,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		// simulate snapshot/restore with empty element queue but NFA state
 		StreamStateHandle snapshot2 = harness.snapshot(1, 1);
+		harness.close();
 
 		harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
@@ -334,6 +338,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		// simulate snapshot/restore with some elements in internal sorting queue
 		StreamStateHandle snapshot = harness.snapshot(0, 0);
+		harness.close();
 
 		harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
@@ -364,6 +369,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		// simulate snapshot/restore with empty element queue but NFA state
 		StreamStateHandle snapshot2 = harness.snapshot(1, 1);
+		harness.close();
 
 		harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 1981f5b..efddecc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -387,8 +387,8 @@ public class Execution {
 			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
 				attemptId,
 				slot,
-					chainedStateHandle,
-					keyGroupsStateHandles,
+				chainedStateHandle,
+				keyGroupsStateHandles,
 				attemptNumber);
 
 			// register this execution at the execution graph, to receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 4972c51..bc61742 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1669,7 +1669,6 @@ public class CheckpointCoordinatorTest {
 				200000,
 				0L,
 				1, // max one checkpoint at a time => should not affect savepoints
-				42,
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
@@ -1721,7 +1720,6 @@ public class CheckpointCoordinatorTest {
 				200000,
 				100000000L, // very long min delay => should not affect savepoints
 				1,
-				42,
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
 				new ExecutionVertex[] { vertex1 },
@@ -1742,22 +1740,6 @@ public class CheckpointCoordinatorTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
-		return mockExecutionVertex(attemptID, ExecutionState.RUNNING);
-	}
-
-	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, 
-														ExecutionState state, ExecutionState ... successiveStates) {
-		final Execution exec = mock(Execution.class);
-		when(exec.getAttemptId()).thenReturn(attemptID);
-		when(exec.getState()).thenReturn(state, successiveStates);
-
-		ExecutionVertex vertex = mock(ExecutionVertex.class);
-		when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
-		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-
-		return vertex;
-	}
 /**
 	 * Tests that the checkpointed partitioned and non-partitioned state is assigned properly
to
 	 * the {@link Execution} upon recovery.

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 5984aca..33ec182 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1148,20 +1148,20 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
{
 	
 	@Test
 	public void testEmptyStateCheckpointing() {
+
 		try {
-			DummyEnvironment env = new DummyEnvironment("test", 1, 0);
-			backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
+			CheckpointStreamFactory streamFactory = createStreamFactory();
+			KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
 
-			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot = backend
-					.snapshotPartitionedState(682375462379L, 1);
-			
+			// draw a snapshot
+			KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory));
 			assertNull(snapshot);
-			backend.dispose();
+			backend.close();
 
-			// Make sure we can restore from empty state
-			backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
-			backend.injectKeyValueStateSnapshots((HashMap) snapshot);
-			backend.dispose();
+			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
+			backend.close();
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 701281b..bedc8fa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -614,7 +614,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 	private boolean performCheckpoint(final long checkpointId, final long timestamp) throws
Exception {
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
-		
 		synchronized (lock) {
 			if (isRunning) {
 
@@ -677,7 +676,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				synchronized (cancelables) {
 					cancelables.add(asyncCheckpointRunnable);
 				}
-
 				asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
 				return true;
 			} else {
@@ -685,7 +683,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			}
 		}
 	}
-	
+
+	public ExecutorService getAsyncOperationsThreadPool() {
+		return asyncOperationsThreadPool;
+	}
+
 	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
 		synchronized (lock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 874274f..c93a439 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -371,8 +372,8 @@ public class StreamGraphGeneratorTest {
 		StreamNode keyedResult3Node = graph.getStreamNode(keyedResult3.getId());
 		StreamNode keyedResult4Node = graph.getStreamNode(keyedResult4.getId());
 
-		assertEquals(globalParallelism, keyedResult1Node.getMaxParallelism());
-		assertEquals(mapParallelism, keyedResult2Node.getMaxParallelism());
+		assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, keyedResult1Node.getMaxParallelism());
+		assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, keyedResult2Node.getMaxParallelism());
 		assertEquals(maxParallelism, keyedResult3Node.getMaxParallelism());
 		assertEquals(maxParallelism, keyedResult4Node.getMaxParallelism());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 03f50f9..7e86da0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -113,6 +113,10 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 					final int numberOfKeyGroups = (Integer) invocationOnMock.getArguments()[1];
 					final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2];
 
+					if(keyedStateBackend != null) {
+						keyedStateBackend.close();
+					}
+
 					if (restoredKeyedState == null) {
 						keyedStateBackend = stateBackend.createKeyedStateBackend(
 								mockTask.getEnvironment(),
@@ -195,5 +199,8 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 	 */
 	public void close() throws Exception {
 		super.close();
+		if(keyedStateBackend != null) {
+			keyedStateBackend.close();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index d353468..c57c29c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -69,6 +69,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase
{
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     env.setParallelism(2)
+    env.getConfig.setMaxParallelism(2);
 
     val sourceStream = env.addSource(new SourceFunction[(Int, Int)] {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 2d634de..2e6ce78 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -337,7 +337,6 @@ public class WindowCheckpointingITCase extends TestLogger {
 			// we loop longer than we have elements, to permit delayed checkpoints
 			// to still cause a failure
 			while (running) {
-
 				if (!failedBefore) {
 					// delay a bit, if we have not failed before
 					Thread.sleep(1);
@@ -350,17 +349,15 @@ public class WindowCheckpointingITCase extends TestLogger {
 				}
 
 				if (numElementsEmitted < numElementsToEmit &&
-						(failedBefore || numElementsEmitted <= failureAfterNumElements))
-				{
+						(failedBefore || numElementsEmitted <= failureAfterNumElements)) {
 					// the function failed before, or we are in the elements before the failure
 					synchronized (ctx.getCheckpointLock()) {
 						int next = numElementsEmitted++;
 						ctx.collect(new Tuple2<Long, IntType>((long) next, new IntType(next)));
 					}
-				}
-				else {
+				} else {
 					// if our work is done, delay a bit to prevent busy waiting
-					Thread.sleep(1);
+					Thread.sleep(10);
 				}
 			}
 		}
@@ -409,6 +406,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 		public void open(Configuration parameters) throws Exception {
 			// this sink can only work with DOP 1
 			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+			checkSuccess();
 		}
 
 		@Override
@@ -423,6 +421,10 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 			// check if we have seen all we expect
 			aggCount += value.f1.value;
+			checkSuccess();
+		}
+
+		private void checkSuccess() throws SuccessException {
 			if (aggCount >= elementCountExpected * countPerElementExpected) {
 				// we are done. validate
 				assertEquals(elementCountExpected, counts.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index 1fbebd0..e49f832 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -524,6 +524,7 @@ public class IterateITCase extends StreamingMultipleProgramsTestBase {
 			try {
 				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 				env.setParallelism(DEFAULT_PARALLELISM - 1);
+				env.getConfig().setMaxParallelism(env.getParallelism());
 
 				KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
index 7ebf378..2ef5f01 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
@@ -39,6 +39,7 @@ class CustomPartitioningTest extends CompilerTestBase {
       
       val env = ExecutionEnvironment.getExecutionEnvironment
       env.setParallelism(parallelism)
+      env.getConfig.setMaxParallelism(parallelism);
 
       val data = env.fromElements( (0,0) ).rebalance()
       
@@ -108,7 +109,8 @@ class CustomPartitioningTest extends CompilerTestBase {
       
       val env = ExecutionEnvironment.getExecutionEnvironment
       env.setParallelism(parallelism)
-      
+      env.getConfig.setMaxParallelism(parallelism);
+
       val data = env.fromElements(new Pojo()).rebalance()
       
       data
@@ -179,6 +181,7 @@ class CustomPartitioningTest extends CompilerTestBase {
       
       val env = ExecutionEnvironment.getExecutionEnvironment
       env.setParallelism(parallelism)
+      env.getConfig.setMaxParallelism(parallelism);
       
       val data = env.fromElements(new Pojo()).rebalance()
       

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
index 3121d68..05294b9 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
@@ -52,6 +52,7 @@ class DeltaIterationTranslationTest {
 
       val env = ExecutionEnvironment.getExecutionEnvironment
       env.setParallelism(DEFAULT_PARALLELISM)
+      env.getConfig.setMaxParallelism(DEFAULT_PARALLELISM);
 
       val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
       val initialWorkSet = env.fromElements((1.23, "abc"))


Mime
View raw message