flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [20/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests
Date Wed, 12 Jul 2017 23:44:20 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
index 4a1d181..7dd1144 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.checkpointing.utils;
 
-import static org.junit.Assert.assertEquals;
-
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -53,9 +51,12 @@ import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
+
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * This verifies that we can restore a complete job from a Flink 1.2 savepoint.
  *
@@ -147,7 +148,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
 	}
 
-
 	@Test
 	public void testSavepointRestoreFromFlink12() throws Exception {
 
@@ -249,7 +249,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 	private static class LegacyCheckpointedSource
 			implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
 
-		public static String CHECKPOINTED_STRING = "Here be dragons!";
+		public static String checkpointedString = "Here be dragons!";
 
 		private static final long serialVersionUID = 1L;
 
@@ -286,12 +286,12 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		@Override
 		public void restoreState(String state) throws Exception {
-			assertEquals(CHECKPOINTED_STRING, state);
+			assertEquals(checkpointedString, state);
 		}
 
 		@Override
 		public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return CHECKPOINTED_STRING;
+			return checkpointedString;
 		}
 	}
 
@@ -322,7 +322,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		@Override
 		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-			assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+			assertEquals(LegacyCheckpointedSource.checkpointedString, restoredState);
 			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
 
 			// immediately trigger any set timers
@@ -350,12 +350,12 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+	private static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
 			implements Checkpointed<Tuple2<String, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
-		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+		public static Tuple2<String, Long> checkpointedTuple =
 				new Tuple2<>("hello", 42L);
 
 		@Override
@@ -369,11 +369,11 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		@Override
 		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return CHECKPOINTED_TUPLE;
+			return checkpointedTuple;
 		}
 	}
 
-	public static class CheckingRestoringFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+	private static class CheckingRestoringFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
 			implements CheckpointedRestoring<Tuple2<String, Long>> {
 
 		private static final long serialVersionUID = 1L;
@@ -393,7 +393,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
 			out.collect(value);
 
-			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
 			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
 
 		}
@@ -404,13 +404,13 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class LegacyCheckpointedFlatMapWithKeyedState
+	private static class LegacyCheckpointedFlatMapWithKeyedState
 			extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
 			implements Checkpointed<Tuple2<String, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
-		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+		public static Tuple2<String, Long> checkpointedTuple =
 				new Tuple2<>("hello", 42L);
 
 		private final ValueStateDescriptor<Long> stateDescriptor =
@@ -431,11 +431,11 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		@Override
 		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return CHECKPOINTED_TUPLE;
+			return checkpointedTuple;
 		}
 	}
 
-	public static class CheckingRestoringFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+	private static class CheckingRestoringFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
 			implements CheckpointedRestoring<Tuple2<String, Long>> {
 
 		private static final long serialVersionUID = 1L;
@@ -464,7 +464,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 			}
 
 			assertEquals(value.f1, state.value());
-			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
 			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
 		}
 
@@ -474,7 +474,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class CheckingRestoringFlatMapWithKeyedStateInOperator extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+	private static class CheckingRestoringFlatMapWithKeyedStateInOperator extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
 			implements CheckpointedRestoring<Tuple2<String, Long>> {
 
 		private static final long serialVersionUID = 1L;
@@ -503,7 +503,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 			}
 
 			assertEquals(value.f1, state.value());
-			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
 			getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
 		}
 
@@ -513,7 +513,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	private static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -528,7 +528,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class CheckingKeyedStateFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	private static class CheckingKeyedStateFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -558,7 +558,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class CheckpointedUdfOperator
+	private static class CheckpointedUdfOperator
 			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
 			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		private static final long serialVersionUID = 1L;
@@ -591,7 +591,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class CheckingRestoringUdfOperator
+	private static class CheckingRestoringUdfOperator
 			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
 			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
@@ -635,7 +635,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class TimelyStatefulOperator
+	private static class TimelyStatefulOperator
 			extends AbstractStreamOperator<Tuple2<Long, Long>>
 			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> {
 		private static final long serialVersionUID = 1L;
@@ -687,7 +687,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class CheckingTimelyStatefulOperator
+	private static class CheckingTimelyStatefulOperator
 			extends AbstractStreamOperator<Tuple2<Long, Long>>
 			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> {
 		private static final long serialVersionUID = 1L;
@@ -751,7 +751,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		}
 	}
 
-	public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
+	private static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
 		private static final long serialVersionUID = 1L;
 
 		public static final String NUM_ELEMENTS_ACCUMULATOR = AccumulatorCountingSink.class + "_NUM_ELEMENTS";

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 8546368..31b6bcc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -41,17 +41,13 @@ import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
@@ -59,9 +55,18 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+/**
+ * Test job classloader.
+ */
 public class ClassLoaderITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderITCase.class);
@@ -318,7 +323,7 @@ public class ClassLoaderITCase extends TestLogger {
 		// Trigger savepoint
 		String savepointPath = null;
 		for (int i = 0; i < 20; i++) {
-			LOG.info("Triggering savepoint (" + (i+1) + "/20).");
+			LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
 			Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
 
 			Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index 795ae41..51fad6b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
-import java.lang.RuntimeException;
 import java.util.Collections;
 import java.util.List;
 
@@ -39,7 +38,7 @@ import java.util.List;
 public class CheckpointedStreamingProgram {
 
 	private static final int CHECKPOINT_INTERVALL = 100;
-	
+
 	public static void main(String[] args) throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -47,21 +46,20 @@ public class CheckpointedStreamingProgram {
 		env.enableCheckpointing(CHECKPOINT_INTERVALL);
 		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 100L));
 		env.disableOperatorChaining();
-		
+
 		DataStream<String> text = env.addSource(new SimpleStringGenerator());
 		text.map(new StatefulMapper()).addSink(new NoOpSink());
 		env.setParallelism(1);
 		env.execute("Checkpointed Streaming Program");
 	}
 
-
 	// with Checkpoining
-	public static class SimpleStringGenerator implements SourceFunction<String>, ListCheckpointed<Integer> {
+	private static class SimpleStringGenerator implements SourceFunction<String>, ListCheckpointed<Integer> {
 		public boolean running = true;
 
 		@Override
 		public void run(SourceContext<String> ctx) throws Exception {
-			while(running) {
+			while (running) {
 				Thread.sleep(1);
 				ctx.collect("someString");
 			}
@@ -83,7 +81,7 @@ public class CheckpointedStreamingProgram {
 		}
 	}
 
-	public static class StatefulMapper implements MapFunction<String, String>, ListCheckpointed<StatefulMapper>, CheckpointListener {
+	private static class StatefulMapper implements MapFunction<String, String>, ListCheckpointed<StatefulMapper>, CheckpointListener {
 
 		private String someState;
 		private boolean atLeastOneSnapshotComplete = false;
@@ -107,14 +105,14 @@ public class CheckpointedStreamingProgram {
 
 		@Override
 		public String map(String value) throws Exception {
-			if(!atLeastOneSnapshotComplete) {
+			if (!atLeastOneSnapshotComplete) {
 				// throttle consumption by the checkpoint interval until we have one snapshot.
 				Thread.sleep(CHECKPOINT_INTERVALL);
 			}
-			if(atLeastOneSnapshotComplete && !restored) {
+			if (atLeastOneSnapshotComplete && !restored) {
 				throw new RuntimeException("Intended failure, to trigger restore");
 			}
-			if(restored) {
+			if (restored) {
 				throw new SuccessException();
 				//throw new RuntimeException("All good");
 			}
@@ -130,13 +128,13 @@ public class CheckpointedStreamingProgram {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * We intentionally use a user specified failure exception
+	 * We intentionally use a user specified failure exception.
 	 */
-	public static class SuccessException extends Exception {
+	private static class SuccessException extends Exception {
 
 	}
 
-	public static class NoOpSink implements SinkFunction<String>{
+	private static class NoOpSink implements SinkFunction<String>{
 		@Override
 		public void invoke(String value) throws Exception {
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index 2693bc1..aa8e59e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -46,6 +46,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
+/**
+ * Test class used by the {@link org.apache.flink.test.classloading.ClassLoaderITCase}.
+ */
 public class CheckpointingCustomKvStateProgram {
 
 	public static void main(String[] args) throws Exception {
@@ -71,7 +74,7 @@ public class CheckpointingCustomKvStateProgram {
 						return new Tuple2<>(ThreadLocalRandom.current().nextInt(parallelism), value);
 					}
 				})
-				.keyBy(new KeySelector<Tuple2<Integer,Integer>, Integer>() {
+				.keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
@@ -133,12 +136,11 @@ public class CheckpointingCustomKvStateProgram {
 			this.kvState = getRuntimeContext().getReducingState(stateDescriptor);
 		}
 
-
 		@Override
 		public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
 			kvState.add(value.f1);
 
-			if(atLeastOneSnapshotComplete) {
+			if (atLeastOneSnapshotComplete) {
 				if (restored) {
 					throw new SuccessException();
 				} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
index 2caa7cf..a5a2531 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.test.classloading.jar;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -36,9 +32,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test class used by the {@link org.apache.flink.test.classloading.ClassLoaderITCase}.
+ */
 @SuppressWarnings("serial")
 public class CustomInputSplitProgram {
-	
+
 	public static void main(String[] args) throws Exception {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -58,8 +61,8 @@ public class CustomInputSplitProgram {
 		env.execute();
 	}
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>, ResultTypeQueryable<Integer> {
+
+	private static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>, ResultTypeQueryable<Integer> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -113,7 +116,7 @@ public class CustomInputSplitProgram {
 		}
 	}
 
-	public static final class CustomInputSplit implements InputSplit {
+	private static final class CustomInputSplit implements InputSplit {
 
 		private static final long serialVersionUID = 1L;
 
@@ -129,7 +132,7 @@ public class CustomInputSplitProgram {
 		}
 	}
 
-	public static final class CustomSplitAssigner implements InputSplitAssigner {
+	private static final class CustomSplitAssigner implements InputSplitAssigner {
 
 		private final List<CustomInputSplit> remainingSplits;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
index cbd553c..819ad29 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomKvStateProgram.java
@@ -64,7 +64,7 @@ public class CustomKvStateProgram {
 						return new Tuple2<>(ThreadLocalRandom.current().nextInt(parallelism), value);
 					}
 				})
-				.keyBy(new KeySelector<Tuple2<Integer,Integer>, Integer>() {
+				.keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
@@ -112,7 +112,6 @@ public class CustomKvStateProgram {
 			this.kvState = getRuntimeContext().getReducingState(stateDescriptor);
 		}
 
-
 		@Override
 		public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
 			kvState.add(value.f1);

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
index b8e6c85..72940c4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
@@ -33,7 +33,7 @@ import java.util.Collection;
 /**
  * This class belongs to the {@link org.apache.flink.test.classloading.ClassLoaderITCase} test.
  *
- * It tests dynamic class loading for:
+ * <p>It tests dynamic class loading for:
  * <ul>
  *     <li>Custom Functions</li>
  *     <li>Custom Data Types</li>
@@ -41,8 +41,7 @@ import java.util.Collection;
  *     <li>Custom Types in collect()</li>
  * </ul>
  *
- * <p>
- * It's removed by Maven from classpath, so other tests must not depend on it.
+ * <p>It's removed by Maven from classpath, so other tests must not depend on it.
  */
 @SuppressWarnings("serial")
 public class KMeansForTest {
@@ -124,7 +123,7 @@ public class KMeansForTest {
 		}
 
 		public double euclideanDistance(Point other) {
-			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+			return Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y));
 		}
 
 		public void clear() {
@@ -147,7 +146,7 @@ public class KMeansForTest {
 		public Centroid() {}
 
 		public Centroid(int id, double x, double y) {
-			super(x,y);
+			super(x, y);
 			this.id = id;
 		}
 
@@ -166,7 +165,9 @@ public class KMeansForTest {
 	//     USER FUNCTIONS
 	// *************************************************************************
 
-	/** Converts a Tuple2<Double,Double> into a Point. */
+	/**
+	 * Converts a {@code Tuple2<Double, Double>} into a {@link Point}.
+	 */
 	public static final class TuplePointConverter extends RichMapFunction<String, Point> {
 
 		@Override
@@ -176,7 +177,9 @@ public class KMeansForTest {
 		}
 	}
 
-	/** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
+	/**
+	 * Converts a {@code Tuple3<Integer, Double, Double>} into a {@link Centroid}.
+	 */
 	public static final class TupleCentroidConverter extends RichMapFunction<String, Centroid> {
 
 		@Override
@@ -186,7 +189,9 @@ public class KMeansForTest {
 		}
 	}
 
-	/** Determines the closest cluster center for a data point. */
+	/**
+	 * Determines the closest cluster center for a data point.
+	 */
 	public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> {
 
 		private Collection<Centroid> centroids;
@@ -197,7 +202,7 @@ public class KMeansForTest {
 		public void open(Configuration parameters) throws Exception {
 			this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
 			this.acc = new CustomAccumulator();
-			 getRuntimeContext().addAccumulator("myAcc", this.acc);
+			getRuntimeContext().addAccumulator("myAcc", this.acc);
 		}
 
 		@Override
@@ -224,7 +229,9 @@ public class KMeansForTest {
 		}
 	}
 
-	// Use this so that we can check whether POJOs and the POJO comparator also work
+	/**
+	 * 	Use this so that we can check whether POJOs and the POJO comparator also work.
+	 */
 	public static final class DummyTuple3IntPointLong {
 		public Integer field0;
 		public Point field1;
@@ -239,7 +246,9 @@ public class KMeansForTest {
 		}
 	}
 
-	/** Appends a count variable to the tuple. */
+	/**
+	 * Appends a count variable to the tuple.
+	 */
 	public static final class CountAppender extends RichMapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> {
 
 		@Override
@@ -248,7 +257,9 @@ public class KMeansForTest {
 		}
 	}
 
-	/** Sums and counts point coordinates. */
+	/**
+	 * Sums and counts point coordinates.
+	 */
 	public static final class CentroidAccumulator extends RichReduceFunction<DummyTuple3IntPointLong> {
 
 		@Override
@@ -257,7 +268,9 @@ public class KMeansForTest {
 		}
 	}
 
-	/** Computes new centroid from coordinate sum and count of points. */
+	/**
+	 * Computes new centroid from coordinate sum and count of points.
+	 */
 	public static final class CentroidAverager extends RichMapFunction<DummyTuple3IntPointLong, Centroid> {
 
 		@Override
@@ -266,7 +279,7 @@ public class KMeansForTest {
 		}
 	}
 
-	public static class CustomAccumulator implements SimpleAccumulator<Long> {
+	private static class CustomAccumulator implements SimpleAccumulator<Long> {
 
 		private long value;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
index 210973f..1431d96 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/LegacyCheckpointedStreamingProgram.java
@@ -50,9 +50,8 @@ public class LegacyCheckpointedStreamingProgram {
 		env.execute("Checkpointed Streaming Program");
 	}
 
-
 	// with Checkpointing
-	public static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> {
+	private static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> {
 
 		private static final long serialVersionUID = 3700033137820808611L;
 
@@ -60,7 +59,7 @@ public class LegacyCheckpointedStreamingProgram {
 
 		@Override
 		public void run(SourceContext<String> ctx) throws Exception {
-			while(running) {
+			while (running) {
 				Thread.sleep(1);
 				ctx.collect("someString");
 			}
@@ -82,7 +81,7 @@ public class LegacyCheckpointedStreamingProgram {
 		}
 	}
 
-	public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener {
+	private static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener {
 
 		private static final long serialVersionUID = 2703630582894634440L;
 
@@ -104,14 +103,14 @@ public class LegacyCheckpointedStreamingProgram {
 
 		@Override
 		public String map(String value) throws Exception {
-			if(!atLeastOneSnapshotComplete) {
+			if (!atLeastOneSnapshotComplete) {
 				// throttle consumption by the checkpoint interval until we have one snapshot.
 				Thread.sleep(CHECKPOINT_INTERVALL);
 			}
-			if(atLeastOneSnapshotComplete && !restored) {
+			if (atLeastOneSnapshotComplete && !restored) {
 				throw new RuntimeException("Intended failure, to trigger restore");
 			}
-			if(restored) {
+			if (restored) {
 				throw new SuccessException();
 				//throw new RuntimeException("All good");
 			}
@@ -127,14 +126,14 @@ public class LegacyCheckpointedStreamingProgram {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * We intentionally use a user specified failure exception
+	 * We intentionally use a user specified failure exception.
 	 */
-	public static class SuccessException extends Exception {
+	private static class SuccessException extends Exception {
 
 		private static final long serialVersionUID = 7073311460437532086L;
 	}
 
-	public static class NoOpSink implements SinkFunction<String> {
+	private static class NoOpSink implements SinkFunction<String> {
 		private static final long serialVersionUID = 2381410324190818620L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index 4905d43..69421a6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -38,9 +38,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+/**
+ * Test class used by the {@link org.apache.flink.test.classloading.ClassLoaderITCase}.
+ */
 @SuppressWarnings("serial")
 public class StreamingCustomInputSplitProgram {
-	
+
 	public static void main(String[] args) throws Exception {
 				Configuration config = new Configuration();
 
@@ -62,8 +65,8 @@ public class StreamingCustomInputSplitProgram {
 		env.execute();
 	}
 	// --------------------------------------------------------------------------------------------
-	
-	public static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>, ResultTypeQueryable<Integer> {
+
+	private static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>, ResultTypeQueryable<Integer> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -119,7 +122,7 @@ public class StreamingCustomInputSplitProgram {
 		}
 	}
 
-	public static final class CustomInputSplit implements InputSplit {
+	private static final class CustomInputSplit implements InputSplit {
 
 		private static final long serialVersionUID = 1L;
 
@@ -135,7 +138,7 @@ public class StreamingCustomInputSplitProgram {
 		}
 	}
 
-	public static final class CustomSplitAssigner implements InputSplitAssigner, Serializable {
+	private static final class CustomSplitAssigner implements InputSplitAssigner, Serializable {
 
 		private final List<CustomInputSplit> remainingSplits;
 
@@ -156,7 +159,7 @@ public class StreamingCustomInputSplitProgram {
 		}
 	}
 
-	public static class NoOpSink implements SinkFunction<Tuple2<Integer, Double>> {
+	private static class NoOpSink implements SinkFunction<Tuple2<Integer, Double>> {
 		@Override
 		public void invoke(Tuple2<Integer, Double> value) throws Exception {
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 0fdc744..596e4dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.classloading.jar;
 
-import java.util.StringTokenizer;
-
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -27,13 +25,18 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.util.Collector;
 
+import java.util.StringTokenizer;
+
+/**
+ * Test class used by the {@link org.apache.flink.test.classloading.ClassLoaderITCase}.
+ */
 @SuppressWarnings("serial")
 public class StreamingProgram {
-	
+
 	public static void main(String[] args) throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
-		
+
 		DataStream<String> text = env.fromElements(WordCountData.TEXT).rebalance();
 
 		DataStream<Word> counts =
@@ -45,6 +48,9 @@ public class StreamingProgram {
 	}
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * POJO with word and count.
+	 */
 	public static class Word {
 
 		private String word;
@@ -80,7 +86,7 @@ public class StreamingProgram {
 		}
 	}
 
-	public static class Tokenizer implements FlatMapFunction<String, Word>{
+	private static class Tokenizer implements FlatMapFunction<String, Word>{
 		@Override
 		public void flatMap(String value, Collector<Word> out) throws Exception {
 			StringTokenizer tokenizer = new StringTokenizer(value);
@@ -90,7 +96,7 @@ public class StreamingProgram {
 		}
 	}
 
-	public static class NoOpSink implements SinkFunction<Word>{
+	private static class NoOpSink implements SinkFunction<Word>{
 		@Override
 		public void invoke(Word value) throws Exception {
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
index f12fd5f..a15a7a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/UserCodeType.java
@@ -26,10 +26,10 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 /**
  * Test class used by the {@link org.apache.flink.test.classloading.ClassLoaderITCase}.
  *
- * This class is used to test FLINK-3633
+ * <p>This class is used to test FLINK-3633
  */
 public class UserCodeType {
-	public static class CustomType {
+	private static class CustomType {
 		private final int value;
 
 		public CustomType(int value) {
@@ -46,7 +46,7 @@ public class UserCodeType {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
 
-		DataSet<Integer> input = env.fromElements(1,2,3,4,5);
+		DataSet<Integer> input = env.fromElements(1, 2, 3, 4, 5);
 
 		DataSet<CustomType> customTypes = input.map(new MapFunction<Integer, CustomType>() {
 			private static final long serialVersionUID = -5878758010124912128L;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
deleted file mode 100644
index 61595f2..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.test.clients.examples;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.runtime.client.JobRetrievalException;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import scala.collection.Seq;
-
-import java.util.concurrent.Semaphore;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-/**
- * Tests retrieval of a job from a running Flink cluster
- */
-public class JobRetrievalITCase extends TestLogger {
-
-	private static final Semaphore lock = new Semaphore(1);
-
-	private static FlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void before() {
-		Configuration configuration = new Configuration();
-		cluster = new TestingCluster(configuration, false);
-		cluster.start();
-	}
-
-	@AfterClass
-	public static void after() {
-		cluster.stop();
-		cluster = null;
-	}
-
-	@Test
-	public void testJobRetrieval() throws Exception {
-		final JobID jobID = new JobID();
-
-		final JobVertex imalock = new JobVertex("imalock");
-		imalock.setInvokableClass(SemaphoreInvokable.class);
-
-		final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
-
-		final ClusterClient client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices());
-
-		// acquire the lock to make sure that the job cannot complete until the job client
-		// has been attached in resumingThread
-		lock.acquire();
-		client.runDetached(jobGraph, JobRetrievalITCase.class.getClassLoader());
-
-		final Thread resumingThread = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					assertNotNull(client.retrieveJob(jobID));
-				} catch (Throwable e) {
-					fail(e.getMessage());
-				}
-			}
-		});
-
-		final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get();
-		final ActorSystem actorSystem = actorSystemSeq.last();
-		JavaTestKit testkit = new JavaTestKit(actorSystem);
-
-		final ActorRef jm = cluster.getJobManagersAsJava().get(0);
-		// wait until client connects
-		jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef());
-		// confirm registration
-		testkit.expectMsgEquals(true);
-
-		// kick off resuming
-		resumingThread.start();
-
-		// wait for client to connect
-		testkit.expectMsgAllOf(
-			TestingJobManagerMessages.getClientConnected(),
-			TestingJobManagerMessages.getClassLoadingPropsDelivered());
-
-		// client has connected, we can release the lock
-		lock.release();
-
-		resumingThread.join();
-	}
-
-	@Test
-	public void testNonExistingJobRetrieval() throws Exception {
-		final JobID jobID = new JobID();
-		ClusterClient client = new StandaloneClusterClient(cluster.configuration());
-
-		try {
-			client.retrieveJob(jobID);
-			fail();
-		} catch (JobRetrievalException ignored) {
-			// this is what we want
-		}
-	}
-
-
-	public static class SemaphoreInvokable extends AbstractInvokable {
-
-		@Override
-		public void invoke() throws Exception {
-			lock.acquire();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
deleted file mode 100644
index 606cdc1..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.clients.examples;
-
-import java.io.File;
-import java.io.FileWriter;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.testfunctions.Tokenizer;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class LocalExecutorITCase extends TestLogger {
-
-	private static final int parallelism = 4;
-
-	@Test
-	public void testLocalExecutorWithWordCount() {
-		try {
-			// set up the files
-			File inFile = File.createTempFile("wctext", ".in");
-			File outFile = File.createTempFile("wctext", ".out");
-			inFile.deleteOnExit();
-			outFile.deleteOnExit();
-
-			try (FileWriter fw = new FileWriter(inFile)) {
-				fw.write(WordCountData.TEXT);
-			}
-
-			LocalExecutor executor = new LocalExecutor();
-			executor.setDefaultOverwriteFiles(true);
-			executor.setTaskManagerNumSlots(parallelism);
-			executor.setPrintStatusDuringExecution(false);
-			executor.start();
-			Plan wcPlan = getWordCountPlan(inFile, outFile, parallelism);
-			wcPlan.setExecutionConfig(new ExecutionConfig());
-			executor.executePlan(wcPlan);
-			executor.stop();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	private Plan getWordCountPlan(File inFile, File outFile, int parallelism) {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(parallelism);
-		env.readTextFile(inFile.getAbsolutePath())
-			.flatMap(new Tokenizer())
-			.groupBy(0)
-			.sum(1)
-			.writeAsCsv(outFile.getAbsolutePath());
-		return env.createProgramPlan();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
index b4549a8..91566af 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
@@ -18,15 +18,18 @@
 
 package org.apache.flink.test.completeness;
 
-import java.lang.reflect.Modifier;
-import java.util.Set;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
 import org.apache.flink.util.TestLogger;
-import static org.junit.Assert.assertTrue;
+
 import org.junit.Test;
 import org.reflections.Reflections;
 
+import java.lang.reflect.Modifier;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
 /**
  * Scans the class path for type information and checks if there is a test for it.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
deleted file mode 100644
index 21aa40a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.distributedCache;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.test.util.TestEnvironment;
-import org.apache.flink.util.Collector;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
-
-public class DistributedCacheTest extends AbstractTestBase {
-
-	public static final String data
-			= "machen\n"
-			+ "zeit\n"
-			+ "heerscharen\n"
-			+ "keiner\n"
-			+ "meine\n";
-
-	private static final int PARALLELISM = 4;
-
-	private static LocalFlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void setup() throws Exception {
-		cluster = TestBaseUtils.startCluster(1, PARALLELISM, false, false, true);
-		TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
-		TestEnvironment.setAsContext(cluster, PARALLELISM);
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		TestStreamEnvironment.unsetAsContext();
-		TestEnvironment.unsetAsContext();
-		TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
-
-	// ------------------------------------------------------------------------
-
-	public DistributedCacheTest() {
-		super(new Configuration());
-	}
-	
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testStreamingDistributedCache() throws Exception {
-		String textPath = createTempFile("count.txt", data);
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.registerCachedFile(textPath, "cache_test");
-		env.readTextFile(textPath).flatMap(new WordChecker());
-		env.execute();
-	}
-
-	@Test
-	public void testBatchDistributedCache() throws Exception {
-		String textPath = createTempFile("count.txt", data);
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.registerCachedFile(textPath, "cache_test");
-		env.readTextFile(textPath).flatMap(new WordChecker()).count();
-	}
-
-	public static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>> {
-		private static final long serialVersionUID = 1L;
-
-		private final List<String> wordList = new ArrayList<>();
-
-		@Override
-		public void open(Configuration conf) throws IOException {
-			File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
-			try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
-				String tempString;
-				while ((tempString= reader.readLine()) != null) {
-					wordList.add(tempString);
-				}
-			}
-		}
-
-		@Override
-		public void flatMap(String word, Collector<Tuple1<String>> out) throws Exception {
-			assertTrue("Unexpected word in stream! wordFromStream: " + word + ", shouldBeOneOf: " +
-				wordList.toString(), wordList.contains(word));
-
-			out.collect(new Tuple1<>(word));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java b/flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
new file mode 100644
index 0000000..63ce3ab
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.distributedcache;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.Collector;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the distributed cache.
+ */
+public class DistributedCacheTest extends AbstractTestBase {
+
+	public static final String DATA =
+			"machen\n" +
+			"zeit\n" +
+			"heerscharen\n" +
+			"keiner\n" +
+			"meine\n";
+
+	private static final int PARALLELISM = 4;
+
+	private static LocalFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		cluster = TestBaseUtils.startCluster(1, PARALLELISM, false, false, true);
+		TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
+		TestEnvironment.setAsContext(cluster, PARALLELISM);
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		TestStreamEnvironment.unsetAsContext();
+		TestEnvironment.unsetAsContext();
+		TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public DistributedCacheTest() {
+		super(new Configuration());
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testStreamingDistributedCache() throws Exception {
+		String textPath = createTempFile("count.txt", DATA);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.registerCachedFile(textPath, "cache_test");
+		env.readTextFile(textPath).flatMap(new WordChecker());
+		env.execute();
+	}
+
+	@Test
+	public void testBatchDistributedCache() throws Exception {
+		String textPath = createTempFile("count.txt", DATA);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.registerCachedFile(textPath, "cache_test");
+		env.readTextFile(textPath).flatMap(new WordChecker()).count();
+	}
+
+	private static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>> {
+		private static final long serialVersionUID = 1L;
+
+		private final List<String> wordList = new ArrayList<>();
+
+		@Override
+		public void open(Configuration conf) throws IOException {
+			File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
+			try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
+				String tempString;
+				while ((tempString = reader.readLine()) != null) {
+					wordList.add(tempString);
+				}
+			}
+		}
+
+		@Override
+		public void flatMap(String word, Collector<Tuple1<String>> out) throws Exception {
+			assertTrue("Unexpected word in stream! wordFromStream: " + word + ", shouldBeOneOf: " +
+				wordList.toString(), wordList.contains(word));
+
+			out.collect(new Tuple1<>(word));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
new file mode 100644
index 0000000..6ce4f76
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.Semaphore;
+
+import scala.collection.Seq;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests retrieval of a job from a running Flink cluster.
+ */
+public class JobRetrievalITCase extends TestLogger {
+
+	private static final Semaphore lock = new Semaphore(1);
+
+	private static FlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void before() {
+		Configuration configuration = new Configuration();
+		cluster = new TestingCluster(configuration, false);
+		cluster.start();
+	}
+
+	@AfterClass
+	public static void after() {
+		cluster.stop();
+		cluster = null;
+	}
+
+	@Test
+	public void testJobRetrieval() throws Exception {
+		final JobID jobID = new JobID();
+
+		final JobVertex imalock = new JobVertex("imalock");
+		imalock.setInvokableClass(SemaphoreInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
+
+		final ClusterClient client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices());
+
+		// acquire the lock to make sure that the job cannot complete until the job client
+		// has been attached in resumingThread
+		lock.acquire();
+		client.runDetached(jobGraph, JobRetrievalITCase.class.getClassLoader());
+
+		final Thread resumingThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					assertNotNull(client.retrieveJob(jobID));
+				} catch (Throwable e) {
+					fail(e.getMessage());
+				}
+			}
+		});
+
+		final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get();
+		final ActorSystem actorSystem = actorSystemSeq.last();
+		JavaTestKit testkit = new JavaTestKit(actorSystem);
+
+		final ActorRef jm = cluster.getJobManagersAsJava().get(0);
+		// wait until client connects
+		jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef());
+		// confirm registration
+		testkit.expectMsgEquals(true);
+
+		// kick off resuming
+		resumingThread.start();
+
+		// wait for client to connect
+		testkit.expectMsgAllOf(
+			TestingJobManagerMessages.getClientConnected(),
+			TestingJobManagerMessages.getClassLoadingPropsDelivered());
+
+		// client has connected, we can release the lock
+		lock.release();
+
+		resumingThread.join();
+	}
+
+	@Test
+	public void testNonExistingJobRetrieval() throws Exception {
+		final JobID jobID = new JobID();
+		ClusterClient client = new StandaloneClusterClient(cluster.configuration());
+
+		try {
+			client.retrieveJob(jobID);
+			fail();
+		} catch (JobRetrievalException ignored) {
+			// this is what we want
+		}
+	}
+
+	private static class SemaphoreInvokable extends AbstractInvokable {
+
+		@Override
+		public void invoke() throws Exception {
+			lock.acquire();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
new file mode 100644
index 0000000..204d2a8
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.client;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.LocalExecutor;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.testfunctions.Tokenizer;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+
+/**
+ * Integration tests for {@link LocalExecutor}.
+ */
+public class LocalExecutorITCase extends TestLogger {
+
+	private static final int parallelism = 4;
+
+	@Test
+	public void testLocalExecutorWithWordCount() {
+		try {
+			// set up the files
+			File inFile = File.createTempFile("wctext", ".in");
+			File outFile = File.createTempFile("wctext", ".out");
+			inFile.deleteOnExit();
+			outFile.deleteOnExit();
+
+			try (FileWriter fw = new FileWriter(inFile)) {
+				fw.write(WordCountData.TEXT);
+			}
+
+			LocalExecutor executor = new LocalExecutor();
+			executor.setDefaultOverwriteFiles(true);
+			executor.setTaskManagerNumSlots(parallelism);
+			executor.setPrintStatusDuringExecution(false);
+			executor.start();
+			Plan wcPlan = getWordCountPlan(inFile, outFile, parallelism);
+			wcPlan.setExecutionConfig(new ExecutionConfig());
+			executor.executePlan(wcPlan);
+			executor.stop();
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	private Plan getWordCountPlan(File inFile, File outFile, int parallelism) {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(parallelism);
+		env.readTextFile(inFile.getAbsolutePath())
+			.flatMap(new Tokenizer())
+			.groupBy(0)
+			.sum(1)
+			.writeAsCsv(outFile.getAbsolutePath());
+		return env.createProgramPlan();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
new file mode 100644
index 0000000..1383894
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.failing;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for failing job submissions.
+ */
+@RunWith(Parameterized.class)
+public class JobSubmissionFailsITCase extends TestLogger {
+
+	private static final int NUM_SLOTS = 20;
+
+	private static LocalFlinkMiniCluster cluster;
+	private static JobGraph workingJobGraph;
+
+	@BeforeClass
+	public static void setup() {
+		try {
+			Configuration config = new Configuration();
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
+
+			cluster = new LocalFlinkMiniCluster(config);
+
+			cluster.start();
+
+			final JobVertex jobVertex = new JobVertex("Working job vertex.");
+			jobVertex.setInvokableClass(NoOpInvokable.class);
+			workingJobGraph = new JobGraph("Working testing job", jobVertex);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void teardown() {
+		try {
+			cluster.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private boolean detached;
+
+	public JobSubmissionFailsITCase(boolean detached) {
+		this.detached = detached;
+	}
+
+	@Parameterized.Parameters(name = "Detached mode = {0}")
+	public static Collection<Boolean[]> executionModes(){
+		return Arrays.asList(new Boolean[]{false},
+				new Boolean[]{true});
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
+		if (detached) {
+			cluster.submitJobDetached(jobGraph);
+			return null;
+		}
+		else {
+			return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
+		}
+	}
+
+	@Test
+	public void testExceptionInInitializeOnMaster() {
+		try {
+			final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
+			failingJobVertex.setInvokableClass(NoOpInvokable.class);
+
+			final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
+
+			try {
+				submitJob(failingJobGraph);
+				fail("Expected JobExecutionException.");
+			}
+			catch (JobExecutionException e) {
+				assertEquals("Test exception.", e.getCause().getMessage());
+			}
+			catch (Throwable t) {
+				t.printStackTrace();
+				fail("Caught wrong exception of type " + t.getClass() + ".");
+			}
+
+			cluster.submitJobAndWait(workingJobGraph, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSubmitEmptyJobGraph() {
+		try {
+			final JobGraph jobGraph = new JobGraph("Testing job");
+
+			try {
+				submitJob(jobGraph);
+				fail("Expected JobSubmissionException.");
+			}
+			catch (JobSubmissionException e) {
+				assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
+			}
+			catch (Throwable t) {
+				t.printStackTrace();
+				fail("Caught wrong exception of type " + t.getClass() + ".");
+			}
+
+			cluster.submitJobAndWait(workingJobGraph, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSubmitNullJobGraph() {
+		try {
+			try {
+				submitJob(null);
+				fail("Expected JobSubmissionException.");
+			}
+			catch (NullPointerException e) {
+				// yo!
+			}
+			catch (Throwable t) {
+				t.printStackTrace();
+				fail("Caught wrong exception of type " + t.getClass() + ".");
+			}
+
+			cluster.submitJobAndWait(workingJobGraph, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class FailingJobVertex extends JobVertex {
+		private static final long serialVersionUID = -6365291240199412135L;
+
+		public FailingJobVertex(final String msg) {
+			super(msg);
+		}
+
+		@Override
+		public void initializeOnMaster(ClassLoader loader) throws Exception {
+			throw new Exception("Test exception.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
new file mode 100644
index 0000000..b830508
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.failing;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Assert;
+
+import java.util.List;
+
+/**
+ * Tests that both jobs, the failing and the working one, are handled correctly. The first (failing) job must be
+ * canceled and the client must report the failure. The second (working) job must finish successfully and compute the
+ * correct result.
+ *
+ */
+public class TaskFailureITCase extends JavaProgramTestBase {
+
+	private static final String EXCEPTION_STRING = "This is an expected Test Exception";
+
+	@Override
+	protected void testProgram() throws Exception {
+		//test failing version
+		try {
+			executeTask(new FailingTestMapper(), 1);
+		} catch (RuntimeException e) { //expected for collection execution
+			if (!isCollectionExecution()) {
+				Assert.fail();
+			}
+			// for collection execution, no restarts. So, exception should be appended with 0.
+			Assert.assertEquals(EXCEPTION_STRING + ":0", e.getMessage());
+		} catch (JobExecutionException e) { //expected for cluster execution
+			if (isCollectionExecution()) {
+				Assert.fail();
+			}
+			// for cluster execution, one restart. So, exception should be appended with 1.
+			Assert.assertEquals(EXCEPTION_STRING + ":1", e.getCause().getMessage());
+		}
+		//test correct version
+		executeTask(new TestMapper(), 0);
+	}
+
+	private void executeTask(MapFunction<Long, Long> mapper, int retries) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retries, 0));
+		List<Long> result = env.generateSequence(1, 9)
+				.map(mapper)
+				.collect();
+		MultipleProgramsTestBase.compareResultAsText(result, "1\n2\n3\n4\n5\n6\n7\n8\n9");
+	}
+
+	/**
+	 * Working map function.
+	 */
+	public static class TestMapper implements MapFunction<Long, Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Long map(Long value) throws Exception {
+			return value;
+		}
+	}
+
+	/**
+	 * Failing map function.
+	 */
+	public static class FailingTestMapper extends RichMapFunction<Long, Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Long map(Long value) throws Exception {
+			throw new RuntimeException(EXCEPTION_STRING + ":" + getRuntimeContext().getAttemptNumber());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/example/java/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/java/ConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/java/ConnectedComponentsITCase.java
new file mode 100644
index 0000000..c3bfd5d
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/java/ConnectedComponentsITCase.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.java;
+
+import org.apache.flink.examples.java.graph.ConnectedComponents;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.io.BufferedReader;
+
+/**
+ * Test for {@link ConnectedComponents}.
+ */
+public class ConnectedComponentsITCase extends JavaProgramTestBase {
+
+	private static final long SEED = 0xBADC0FFEEBEEFL;
+
+	private static final int NUM_VERTICES = 1000;
+
+	private static final int NUM_EDGES = 10000;
+
+	private String verticesPath;
+	private String edgesPath;
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
+		edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
+		resultPath = getTempFilePath("results");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		ConnectedComponents.main(
+				"--vertices", verticesPath,
+				"--edges", edgesPath,
+				"--output", resultPath,
+				"--iterations", "100");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			ConnectedComponentsData.checkOddEvenResult(reader);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/example/java/EnumTriangleBasicITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/java/EnumTriangleBasicITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/java/EnumTriangleBasicITCase.java
new file mode 100644
index 0000000..e176ce8
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/java/EnumTriangleBasicITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.java;
+
+import org.apache.flink.examples.java.graph.EnumTriangles;
+import org.apache.flink.test.testdata.EnumTriangleData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * Test {@link EnumTriangles}.
+ */
+public class EnumTriangleBasicITCase extends JavaProgramTestBase {
+
+	protected String edgePath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		edgePath = createTempFile("edges", EnumTriangleData.EDGES);
+		resultPath = getTempDirPath("triangles");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_ID, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		EnumTriangles.main(new String[] {
+				"--edges", edgePath,
+				"--output", resultPath });
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
new file mode 100644
index 0000000..daab163
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.java;
+
+import org.apache.flink.examples.java.graph.PageRank;
+import org.apache.flink.test.testdata.PageRankData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+/**
+ * Test for {@link PageRank}.
+ */
+@RunWith(Parameterized.class)
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+	public PageRankITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	private String verticesPath;
+	private String edgesPath;
+	private String resultPath;
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+		File verticesFile = tempFolder.newFile();
+		Files.write(PageRankData.VERTICES, verticesFile, Charsets.UTF_8);
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(PageRankData.EDGES, edgesFile, Charsets.UTF_8);
+
+		verticesPath = verticesFile.toURI().toString();
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareKeyValuePairsWithDelta(expected, resultPath, " ", 0.01);
+	}
+
+	@Test
+	public void testPageRankSmallNumberOfIterations() throws Exception {
+		PageRank.main(new String[]{
+				"--pages", verticesPath,
+				"--links", edgesPath,
+				"--output", resultPath,
+				"--numPages", PageRankData.NUM_VERTICES + "",
+				"--iterations", "3"});
+		expected =  PageRankData.RANKS_AFTER_3_ITERATIONS;
+	}
+
+	@Test
+	public void testPageRankWithConvergenceCriterion() throws Exception {
+		PageRank.main(new String[]{
+				"--pages", verticesPath,
+				"--links", edgesPath,
+				"--output", resultPath,
+				"--numPages", PageRankData.NUM_VERTICES + "",
+				"--vertices", "1000"});
+		expected = PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/example/java/TransitiveClosureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/java/TransitiveClosureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/java/TransitiveClosureITCase.java
new file mode 100644
index 0000000..fc5c6a7
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/java/TransitiveClosureITCase.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.java;
+
+import org.apache.flink.examples.java.graph.TransitiveClosureNaive;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.testdata.TransitiveClosureData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.io.BufferedReader;
+
+/**
+ * Test for {@link TransitiveClosureNaive}.
+ */
+public class TransitiveClosureITCase extends JavaProgramTestBase {
+
+	private static final long SEED = 0xBADC0FFEEBEEFL;
+
+	private static final int NUM_VERTICES = 100;
+
+	private static final int NUM_EDGES = 500;
+
+	private String edgesPath;
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
+		resultPath = getTempFilePath("results");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TransitiveClosureNaive.main(
+				"--edges", edgesPath,
+				"--output", resultPath,
+				"--iterations", "5");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			TransitiveClosureData.checkOddEvenResult(reader);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/example/java/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/java/WebLogAnalysisITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/java/WebLogAnalysisITCase.java
new file mode 100644
index 0000000..555edc1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/java/WebLogAnalysisITCase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.java;
+
+import org.apache.flink.examples.java.relational.WebLogAnalysis;
+import org.apache.flink.test.testdata.WebLogAnalysisData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * Test for {@link WebLogAnalysis}.
+ */
+public class WebLogAnalysisITCase extends JavaProgramTestBase {
+
+	private String docsPath;
+	private String ranksPath;
+	private String visitsPath;
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
+		ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
+		visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WebLogAnalysis.main(new String[] {
+				"--documents", docsPath,
+				"--ranks", ranksPath,
+				"--visits", visitsPath,
+				"--output", resultPath});
+	}
+}


Mime
View raw message