flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [03/15] flink git commit: [FLINK-3995] [build] Properly structure test scopes and dependencies
Date Tue, 05 Jul 2016 14:38:40 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
new file mode 100644
index 0000000..6acc653
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -0,0 +1,673 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.junit.Assert;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TestBaseUtils extends TestLogger {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TestBaseUtils.class);
+
+	protected static final int MINIMUM_HEAP_SIZE_MB = 192;
+
+	protected static final long TASK_MANAGER_MEMORY_SIZE = 80;
+
+	protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000;
+
+	protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
+
+	protected static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+
+	// ------------------------------------------------------------------------
+	
+	protected static File logDir;
+
+	protected TestBaseUtils(){
+		verifyJvmOptions();
+	}
+	
+	private static void verifyJvmOptions() {
+		long heap = Runtime.getRuntime().maxMemory() >> 20;
+		Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+				+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
+	}
+	
+	
+	public static ForkableFlinkMiniCluster startCluster(
+		int numTaskManagers,
+		int taskManagerNumSlots,
+		boolean startWebserver,
+		boolean startZooKeeper,
+		boolean singleActorSystem) throws Exception {
+		
+		Configuration config = new Configuration();
+
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+		
+		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver);
+
+		if (startZooKeeper) {
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
+			config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		}
+
+		return startCluster(config, singleActorSystem);
+	}
+
+	public static ForkableFlinkMiniCluster startCluster(
+		Configuration config,
+		boolean singleActorSystem) throws Exception {
+
+		logDir = File.createTempFile("TestBaseUtils-logdir", null);
+		Assert.assertTrue("Unable to delete temp file", logDir.delete());
+		Assert.assertTrue("Unable to create temp directory", logDir.mkdir());
+		Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
+		Files.createFile(new File(logDir, "jobmanager.out").toPath());
+
+		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
+		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
+
+		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
+		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
+
+		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
+		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+		
+		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
+
+		ForkableFlinkMiniCluster cluster =  new ForkableFlinkMiniCluster(config, singleActorSystem);
+
+		cluster.start();
+
+		return cluster;
+	}
+
+
+	public static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+		if (logDir != null) {
+			FileUtils.deleteDirectory(logDir);
+		}
+		if (executor != null) {
+			int numUnreleasedBCVars = 0;
+			int numActiveConnections = 0;
+			
+			if (executor.running()) {
+				List<ActorRef> tms = executor.getTaskManagersAsJava();
+				List<Future<Object>> bcVariableManagerResponseFutures = new ArrayList<>();
+				List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<>();
+
+				for (ActorRef tm : tms) {
+					bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
+							.RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout(timeout)));
+
+					numActiveConnectionsResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
+							.RequestNumActiveConnections$.MODULE$, new Timeout(timeout)));
+				}
+
+				Future<Iterable<Object>> bcVariableManagerFutureResponses = Futures.sequence(
+						bcVariableManagerResponseFutures, defaultExecutionContext());
+
+				Iterable<Object> responses = Await.result(bcVariableManagerFutureResponses, timeout);
+
+				for (Object response : responses) {
+					numUnreleasedBCVars += ((TestingTaskManagerMessages
+							.ResponseBroadcastVariablesWithReferences) response).number();
+				}
+
+				Future<Iterable<Object>> numActiveConnectionsFutureResponses = Futures.sequence(
+						numActiveConnectionsResponseFutures, defaultExecutionContext());
+
+				responses = Await.result(numActiveConnectionsFutureResponses, timeout);
+
+				for (Object response : responses) {
+					numActiveConnections += ((TestingTaskManagerMessages
+							.ResponseNumActiveConnections) response).number();
+				}
+			}
+
+			executor.stop();
+			FileSystem.closeAll();
+			System.gc();
+
+			Assert.assertEquals("Not all broadcast variables were released.", 0, numUnreleasedBCVars);
+			Assert.assertEquals("Not all TCP connections were released.", 0, numActiveConnections);
+		}
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Result Checking
+	// --------------------------------------------------------------------------------------------
+
+	public static BufferedReader[] getResultReader(String resultPath) throws IOException {
+		return getResultReader(resultPath, new String[]{}, false);
+	}
+
+	public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
+											boolean inOrderOfFiles) throws IOException {
+		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
+
+		if (inOrderOfFiles) {
+			// sort the files after their name (1, 2, 3, 4)...
+			// we cannot sort by path, because strings sort by prefix
+			Arrays.sort(files, new Comparator<File>() {
+
+				@Override
+				public int compare(File o1, File o2) {
+					try {
+						int f1 = Integer.parseInt(o1.getName());
+						int f2 = Integer.parseInt(o2.getName());
+						return f1 < f2 ? -1 : (f1 > f2 ? 1 : 0);
+					}
+					catch (NumberFormatException e) {
+						throw new RuntimeException("The file names are no numbers and cannot be ordered: " +
+								o1.getName() + "/" + o2.getName());
+					}
+				}
+			});
+		}
+
+		BufferedReader[] readers = new BufferedReader[files.length];
+		for (int i = 0; i < files.length; i++) {
+			readers[i] = new BufferedReader(new FileReader(files[i]));
+		}
+		return readers;
+	}
+	
+	public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
+		return getResultInputStream(resultPath, new String[]{});
+	}
+
+	public static BufferedInputStream[] getResultInputStream(String resultPath, String[]
+			excludePrefixes) throws IOException {
+		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
+		BufferedInputStream[] inStreams = new BufferedInputStream[files.length];
+		for (int i = 0; i < files.length; i++) {
+			inStreams[i] = new BufferedInputStream(new FileInputStream(files[i]));
+		}
+		return inStreams;
+	}
+
+	public static void readAllResultLines(List<String> target, String resultPath) throws IOException {
+		readAllResultLines(target, resultPath, new String[]{});
+	}
+
+	public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes) 
+			throws IOException {
+		
+		readAllResultLines(target, resultPath, excludePrefixes, false);
+	}
+
+	public static void readAllResultLines(List<String> target, String resultPath, 
+											String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
+
+		final BufferedReader[] readers = getResultReader(resultPath, excludePrefixes, inOrderOfFiles);
+		try {
+			for (BufferedReader reader : readers) {
+				String s;
+				while ((s = reader.readLine()) != null) {
+					target.add(s);
+				}
+			}
+		}
+		finally {
+			for (BufferedReader reader : readers) {
+				try {
+					reader.close();
+				}
+				catch (Exception e) {
+					// ignore, this is best-effort cleanup
+				}
+			}
+		}
+	}
+
+	public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception {
+		compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[0]);
+	}
+
+	public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
+											String[] excludePrefixes) throws Exception {
+		ArrayList<String> list = new ArrayList<>();
+		readAllResultLines(list, resultPath, excludePrefixes, false);
+
+		String[] result = list.toArray(new String[list.size()]);
+		Arrays.sort(result);
+
+		String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
+		Arrays.sort(expected);
+
+		Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
+		Assert.assertArrayEquals(expected, result);
+	}
+
+	public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
+																	String resultPath) throws Exception {
+		compareResultsByLinesInMemoryWithStrictOrder(expectedResultStr, resultPath, new String[]{});
+	}
+
+	public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
+																	String resultPath, String[] excludePrefixes) throws Exception {
+		ArrayList<String> list = new ArrayList<>();
+		readAllResultLines(list, resultPath, excludePrefixes, true);
+
+		String[] result = list.toArray(new String[list.size()]);
+
+		String[] expected = expectedResultStr.split("\n");
+
+		Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
+		Assert.assertArrayEquals(expected, result);
+	}
+
+	public static void checkLinesAgainstRegexp(String resultPath, String regexp){
+		Pattern pattern = Pattern.compile(regexp);
+		Matcher matcher = pattern.matcher("");
+
+		ArrayList<String> list = new ArrayList<>();
+		try {
+			readAllResultLines(list, resultPath, new String[]{}, false);
+		} catch (IOException e1) {
+			Assert.fail("Error reading the result");
+		}
+
+		for (String line : list){
+			matcher.reset(line);
+			if (!matcher.find()){
+				String msg = "Line is not well-formed: " + line;
+				Assert.fail(msg);
+			}
+		}
+	}
+
+	public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath,
+														String delimiter, double maxDelta) throws Exception {
+		compareKeyValuePairsWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
+	}
+
+	public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath,
+														String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
+		ArrayList<String> list = new ArrayList<>();
+		readAllResultLines(list, resultPath, excludePrefixes, false);
+
+		String[] result = list.toArray(new String[list.size()]);
+		String[] expected = expectedLines.isEmpty() ? new String[0] : expectedLines.split("\n");
+
+		Assert.assertEquals("Wrong number of result lines.", expected.length, result.length);
+
+		Arrays.sort(result);
+		Arrays.sort(expected);
+
+		for (int i = 0; i < expected.length; i++) {
+			String[] expectedFields = expected[i].split(delimiter);
+			String[] resultFields = result[i].split(delimiter);
+
+			double expectedPayLoad = Double.parseDouble(expectedFields[1]);
+			double resultPayLoad = Double.parseDouble(resultFields[1]);
+
+			Assert.assertTrue("Values differ by more than the permissible delta", Math.abs(expectedPayLoad - resultPayLoad) < maxDelta);
+		}
+	}
+
+	public static <X> void compareResultCollections(List<X> expected, List<X> actual,
+											Comparator<X> comparator) {
+		Assert.assertEquals(expected.size(), actual.size());
+
+		Collections.sort(expected, comparator);
+		Collections.sort(actual, comparator);
+
+		for (int i = 0; i < expected.size(); i++) {
+			Assert.assertEquals(expected.get(i), actual.get(i));
+		}
+	}
+
+	private static File[] getAllInvolvedFiles(String resultPath, String[] excludePrefixes) {
+		final String[] exPrefs = excludePrefixes;
+		File result = asFile(resultPath);
+		if (!result.exists()) {
+			Assert.fail("Result file was not written");
+		}
+		if (result.isDirectory()) {
+			return result.listFiles(new FilenameFilter() {
+
+				@Override
+				public boolean accept(File dir, String name) {
+					for(String p: exPrefs) {
+						if(name.startsWith(p)) {
+							return false;
+						}
+					}
+					return true;
+				}
+			});
+		} else {
+			return new File[] { result };
+		}
+	}
+
+	protected static File asFile(String path) {
+		try {
+			URI uri = new URI(path);
+			if (uri.getScheme().equals("file")) {
+				return new File(uri.getPath());
+			} else {
+				throw new IllegalArgumentException("This path does not denote a local file.");
+			}
+		} catch (URISyntaxException | NullPointerException e) {
+			throw new IllegalArgumentException("This path does not describe a valid local file URI.");
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Comparison methods for tests using collect()
+	// --------------------------------------------------------------------------------------------
+
+	public static <T> void compareResultAsTuples(List<T> result, String expected) {
+		compareResult(result, expected, true, true);
+	}
+
+	public static <T> void compareResultAsText(List<T> result, String expected) {
+		compareResult(result, expected,
+				false, true);
+	}
+
+	public static <T> void compareOrderedResultAsText(List<T> result, String expected) {
+		compareResult(result, expected, false, false);
+	}
+
+	public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples) {
+		compareResult(result, expected, asTuples, false);
+	}
+	
+	private static <T> void compareResult(List<T> result, String expected, boolean asTuples, boolean sort) {
+		String[] expectedStrings = expected.split("\n");
+		String[] resultStrings = new String[result.size()];
+		
+		for (int i = 0; i < resultStrings.length; i++) {
+			T val = result.get(i);
+			
+			if (asTuples) {
+				if (val instanceof Tuple) {
+					Tuple t = (Tuple) val;
+					Object first = t.getField(0);
+					StringBuilder bld = new StringBuilder(first == null ? "null" : first.toString());
+					for (int pos = 1; pos < t.getArity(); pos++) {
+						Object next = t.getField(pos);
+						bld.append(',').append(next == null ? "null" : next.toString());
+					}
+					resultStrings[i] = bld.toString();
+				}
+				else {
+					throw new IllegalArgumentException(val + " is no tuple");
+				}
+			}
+			else {
+				resultStrings[i] = (val == null) ? "null" : val.toString();
+			}
+		}
+		
+		assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
+
+		if (sort) {
+			Arrays.sort(expectedStrings);
+			Arrays.sort(resultStrings);
+		}
+		
+		for (int i = 0; i < expectedStrings.length; i++) {
+			assertEquals(expectedStrings[i], resultStrings[i]);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// Comparison methods for tests using sample
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * The expected string contains all expected results separate with line break, check whether all elements in result
+	 * are contained in the expected string.
+	 * @param result The test result.
+	 * @param expected The expected string value combination.
+	 * @param <T> The result type.
+	 */
+	public static <T> void containsResultAsText(List<T> result, String expected) {
+		String[] expectedStrings = expected.split("\n");
+		List<String> resultStrings = new ArrayList<>();
+
+		for (T val : result) {
+			String str = (val == null) ? "null" : val.toString();
+			resultStrings.add(str);
+		}
+
+		List<String> expectedStringList = Arrays.asList(expectedStrings);
+
+		for (String element : resultStrings) {
+			assertTrue(expectedStringList.contains(element));
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Miscellaneous helper methods
+	// --------------------------------------------------------------------------------------------
+	
+	protected static Collection<Object[]> toParameterList(Configuration ... testConfigs) {
+		ArrayList<Object[]> configs = new ArrayList<>();
+		for (Configuration testConfig : testConfigs) {
+			Object[] c = { testConfig };
+			configs.add(c);
+		}
+		return configs;
+	}
+
+	protected static Collection<Object[]> toParameterList(List<Configuration> testConfigs) {
+		LinkedList<Object[]> configs = new LinkedList<>();
+		for (Configuration testConfig : testConfigs) {
+			Object[] c = { testConfig };
+			configs.add(c);
+		}
+		return configs;
+	}
+
+	// This code is taken from: http://stackoverflow.com/a/7201825/568695
+	// it changes the environment variables of this JVM. Use only for testing purposes!
+	@SuppressWarnings("unchecked")
+	public static void setEnv(Map<String, String> newenv) {
+		try {
+			Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
+			Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
+			theEnvironmentField.setAccessible(true);
+			Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
+			env.putAll(newenv);
+			Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+			theCaseInsensitiveEnvironmentField.setAccessible(true);
+			Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
+			cienv.putAll(newenv);
+		} catch (NoSuchFieldException e) {
+			try {
+				Class<?>[] classes = Collections.class.getDeclaredClasses();
+				Map<String, String> env = System.getenv();
+				for (Class<?> cl : classes) {
+					if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+						Field field = cl.getDeclaredField("m");
+						field.setAccessible(true);
+						Object obj = field.get(env);
+						Map<String, String> map = (Map<String, String>) obj;
+						map.clear();
+						map.putAll(newenv);
+					}
+				}
+			} catch (Exception e2) {
+				throw new RuntimeException(e2);
+			}
+		} catch (Exception e1) {
+			throw new RuntimeException(e1);
+		}
+	}
+	
+	private static ExecutionContext defaultExecutionContext() {
+		return ExecutionContext$.MODULE$.global();
+	}
+	// --------------------------------------------------------------------------------------------
+	//  File helper methods
+	// --------------------------------------------------------------------------------------------
+
+	protected static void deleteRecursively(File f) throws IOException {
+		if (f.isDirectory()) {
+			FileUtils.deleteDirectory(f);
+		} else if (!f.delete()) {
+			System.err.println("Failed to delete file " + f.getAbsolutePath());
+		}
+	}
+	
+	public static String constructTestPath(Class<?> forClass, String folder) {
+		// we create test path that depends on class to prevent name clashes when two tests
+		// create temp files with the same name
+		String path = System.getProperty("java.io.tmpdir");
+		if (!(path.endsWith("/") || path.endsWith("\\")) ) {
+			path += System.getProperty("file.separator");
+		}
+		path += (forClass.getName() + "-" + folder);
+		return path;
+	}
+	
+	public static String constructTestURI(Class<?> forClass, String folder) {
+		return new File(constructTestPath(forClass, folder)).toURI().toString();
+	}
+
+	//---------------------------------------------------------------------------------------------
+	// Web utils
+	//---------------------------------------------------------------------------------------------
+
+	public static String getFromHTTP(String url) throws Exception {
+		URL u = new URL(url);
+		LOG.info("Accessing URL "+url+" as URL: "+u);
+		HttpURLConnection connection = (HttpURLConnection) u.openConnection();
+		connection.setConnectTimeout(100000);
+		connection.connect();
+		InputStream is;
+		if(connection.getResponseCode() >= 400) {
+			// error!
+			LOG.warn("HTTP Response code when connecting to {} was {}", url, connection.getResponseCode());
+			is = connection.getErrorStream();
+		} else {
+			is = connection.getInputStream();
+		}
+
+		return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
+	}
+	
+	public static class TupleComparator<T extends Tuple> implements Comparator<T> {
+
+		@Override
+		public int compare(T o1, T o2) {
+			if (o1 == null || o2 == null) {
+				throw new IllegalArgumentException("Cannot compare null tuples");
+			}
+			else if (o1.getArity() != o2.getArity()) {
+				return o1.getArity() - o2.getArity();
+			}
+			else {
+				for (int i = 0; i < o1.getArity(); i++) {
+					Object val1 = o1.getField(i);
+					Object val2 = o2.getField(i);
+					
+					int cmp;
+					if (val1 != null && val2 != null) {
+						cmp = compareValues(val1, val2);
+					}
+					else {
+						cmp = val1 == null ? (val2 == null ? 0 : -1) : 1;
+					}
+					
+					if (cmp != 0) {
+						return cmp;
+					}
+				}
+				
+				return 0;
+			}
+		}
+		
+		@SuppressWarnings("unchecked")
+		private static <X extends Comparable<X>> int compareValues(Object o1, Object o2) {
+			if (o1 instanceof Comparable && o2 instanceof Comparable) {
+				X c1 = (X) o1;
+				X c2 = (X) o2;
+				return c1.compareTo(c2);
+			}
+			else {
+				throw new IllegalArgumentException("Cannot compare tuples with non comparable elements");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
new file mode 100644
index 0000000..7cb88be
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.CodeAnalysisMode;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+public class TestEnvironment extends ExecutionEnvironment {
+
+	private final ForkableFlinkMiniCluster executor;
+
+	private TestEnvironment lastEnv = null;
+
+	@Override
+	public JobExecutionResult getLastJobExecutionResult() {
+		if (lastEnv == null) {
+			return this.lastJobExecutionResult;
+		}
+		else {
+			return lastEnv.getLastJobExecutionResult();
+		}
+	}
+
+	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
+		this.executor = executor;
+		setParallelism(parallelism);
+
+		// disabled to improve build time
+		getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
+	}
+
+	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
+		this(executor, parallelism);
+
+		if (isObjectReuseEnabled) {
+			getConfig().enableObjectReuse();
+		} else {
+			getConfig().disableObjectReuse();
+		}
+	}
+
+	@Override
+	public void startNewSession() throws Exception {
+	}
+
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		OptimizedPlan op = compileProgram(jobName);
+
+		JobGraphGenerator jgg = new JobGraphGenerator();
+		JobGraph jobGraph = jgg.compileJobGraph(op);
+
+		this.lastJobExecutionResult = executor.submitJobAndWait(jobGraph, false);
+		return this.lastJobExecutionResult;
+	}
+
+
+	@Override
+	public String getExecutionPlan() throws Exception {
+		OptimizedPlan op = compileProgram("unused");
+
+		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+		return jsonGen.getOptimizerPlanAsJSON(op);
+	}
+
+
+	private OptimizedPlan compileProgram(String jobName) {
+		Plan p = createProgramPlan(jobName);
+
+		Optimizer pc = new Optimizer(new DataStatistics(), this.executor.configuration());
+		return pc.compile(p);
+	}
+
+	public void setAsContext() {
+		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+			@Override
+			public ExecutionEnvironment createExecutionEnvironment() {
+				lastEnv = new TestEnvironment(executor, getParallelism(), getConfig().isObjectReuseEnabled());
+				return lastEnv;
+			}
+		};
+
+		initializeContextEnvironment(factory);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
new file mode 100644
index 0000000..79c5a25
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util
+
+import java.util.concurrent.TimeoutException
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.pattern.Patterns._
+import akka.pattern.ask
+
+import org.apache.curator.test.TestingCluster
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager}
+import org.apache.flink.runtime.testutils.TestingResourceManager
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+
+/**
+ * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution
+ * on build servers. If multiple tests run in parallel, the cluster picks up the fork number and
+ * uses it to avoid port conflicts.
+ *
+ * @param userConfiguration Configuration object with the user provided configuration values
+ * @param singleActorSystem true, if all actors (JobManager and TaskManager) shall be run in the
+ *                          same [[ActorSystem]], otherwise false.
+ */
+class ForkableFlinkMiniCluster(
+    userConfiguration: Configuration,
+    singleActorSystem: Boolean)
+  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
+
+  def this(userConfiguration: Configuration) = this(userConfiguration, true)
+
+  // --------------------------------------------------------------------------
+
+  var zookeeperCluster: Option[TestingCluster] = None
+
+  override def generateConfiguration(userConfiguration: Configuration): Configuration = {
+    val forkNumberString = System.getProperty("forkNumber")
+
+    val forkNumber = try {
+      Integer.parseInt(forkNumberString)
+    }
+    catch {
+      case e: NumberFormatException => -1
+    }
+
+    val config = userConfiguration.clone()
+
+    if (forkNumber != -1) {
+      val jobManagerRPC = 1024 + forkNumber*400
+      val taskManagerRPC = 1024 + forkNumber*400 + 100
+      val taskManagerData = 1024 + forkNumber*400 + 200
+      val resourceManagerRPC = 1024 + forkNumber*400 + 300
+
+      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
+      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
+      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
+      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerRPC)
+    }
+
+    super.generateConfiguration(config)
+  }
+
+  override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val jobManagerName = getJobManagerName(index)
+    val archiveName = getArchiveName(index)
+
+    val jobManagerPort = config.getInteger(
+      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+
+    if (jobManagerPort > 0) {
+      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
+    }
+
+    val (jobManager, _) = JobManager.startJobManagerActors(
+      config,
+      actorSystem,
+      Some(jobManagerName),
+      Some(archiveName),
+      classOf[TestingJobManager],
+      classOf[TestingMemoryArchivist])
+
+    jobManager
+  }
+
+  override def startResourceManager(index: Int, system: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val resourceManagerName = getResourceManagerName(index)
+
+    val resourceManagerPort = config.getInteger(
+      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+
+    if (resourceManagerPort > 0) {
+      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
+    }
+
+    val resourceManager = FlinkResourceManager.startResourceManagerActors(
+      config,
+      system,
+      createLeaderRetrievalService(),
+      classOf[TestingResourceManager],
+      resourceManagerName)
+
+    resourceManager
+  }
+
+  override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val rpcPort = config.getInteger(
+      ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+
+    val dataPort = config.getInteger(
+      ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
+
+    if (rpcPort > 0) {
+      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
+    }
+    if (dataPort > 0) {
+      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
+    }
+
+    val localExecution = numTaskManagers == 1
+
+    TaskManager.startTaskManagerComponentsAndActor(
+      config,
+      ResourceID.generate(),
+      system,
+      hostname,
+      Some(TaskManager.TASK_MANAGER_NAME + index),
+      Some(createLeaderRetrievalService()),
+      localExecution,
+      classOf[TestingTaskManager])
+  }
+
+  def restartLeadingJobManager(): Unit = {
+    this.synchronized {
+      (jobManagerActorSystems, jobManagerActors) match {
+        case (Some(jmActorSystems), Some(jmActors)) =>
+          val leader = getLeaderGateway(AkkaUtils.getTimeout(configuration))
+          val index = getLeaderIndex(AkkaUtils.getTimeout(configuration))
+
+          clearLeader()
+
+          val stopped = gracefulStop(leader.actor(), ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
+          Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
+
+          if(!singleActorSystem) {
+            jmActorSystems(index).shutdown()
+            jmActorSystems(index).awaitTermination()
+          }
+
+          val newJobManagerActorSystem = if(!singleActorSystem) {
+            startJobManagerActorSystem(index)
+          } else {
+            jmActorSystems.head
+          }
+
+          val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
+
+          jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1))
+          jobManagerActorSystems = Some(jmActorSystems.patch(
+            index,
+            Seq(newJobManagerActorSystem),
+            1))
+
+          val lrs = createLeaderRetrievalService()
+
+          jobManagerLeaderRetrievalService = Some(lrs)
+          lrs.start(this)
+
+        case _ => throw new Exception("The JobManager of the ForkableFlinkMiniCluster have not " +
+          "been started properly.")
+      }
+    }
+  }
+
+
+  def restartTaskManager(index: Int): Unit = {
+    (taskManagerActorSystems, taskManagerActors) match {
+      case (Some(tmActorSystems), Some(tmActors)) =>
+        val stopped = gracefulStop(tmActors(index), ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
+        Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
+
+        if(!singleActorSystem) {
+          tmActorSystems(index).shutdown()
+          tmActorSystems(index).awaitTermination()
+        }
+
+        val taskManagerActorSystem  = if(!singleActorSystem) {
+          startTaskManagerActorSystem(index)
+        } else {
+          tmActorSystems.head
+        }
+
+        val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
+
+        taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1))
+        taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1))
+
+      case _ => throw new Exception("The TaskManager of the ForkableFlinkMiniCluster have not " +
+        "been started properly.")
+    }
+  }
+
+  override def start(): Unit = {
+    val zookeeperURL = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "")
+
+    zookeeperCluster = if(recoveryMode == RecoveryMode.ZOOKEEPER && zookeeperURL.equals("")) {
+      LOG.info("Starting ZooKeeper cluster.")
+
+      val testingCluster = new TestingCluster(1)
+
+      configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString)
+
+      testingCluster.start()
+
+      Some(testingCluster)
+    } else {
+      None
+    }
+
+    super.start()
+  }
+
+  override def stop(): Unit = {
+    super.stop()
+
+    zookeeperCluster.foreach{
+      LOG.info("Stopping ZooKeeper cluster.")
+      _.close()
+    }
+  }
+
+  def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = {
+    val futures = taskManagerActors.map {
+      _.map {
+        tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
+      }
+    }.getOrElse(Seq())
+
+    try {
+      Await.ready(Future.sequence(futures), timeout)
+    } catch {
+      case t: TimeoutException =>
+        throw new Exception("Timeout while waiting for TaskManagers to register at " +
+          s"${jobManager.path}")
+    }
+
+  }
+}
+
+object ForkableFlinkMiniCluster {
+
+  val MAX_RESTART_DURATION = 2 minute
+
+  val DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT = "200 s"
+
+  def startCluster(
+                    numSlots: Int,
+                    numTaskManagers: Int,
+                    timeout: String = DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT)
+  : ForkableFlinkMiniCluster = {
+
+    val config = new Configuration()
+    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
+    config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
+
+    val cluster = new ForkableFlinkMiniCluster(config)
+
+    cluster.start()
+
+    cluster
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-test-utils-parent/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/pom.xml b/flink-test-utils-parent/pom.xml
new file mode 100644
index 0000000..03747e8
--- /dev/null
+++ b/flink-test-utils-parent/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>1.1-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-test-utils-parent</artifactId>
+	<name>flink-test-utils-parent</name>
+
+	<packaging>pom</packaging>
+
+	<modules>
+		<module>flink-test-utils-junit</module>
+		<module>flink-test-utils</module>
+	</modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml
deleted file mode 100644
index 1038531..0000000
--- a/flink-test-utils/pom.xml
+++ /dev/null
@@ -1,246 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>1.1-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-test-utils_2.10</artifactId>
-	<name>flink-test-utils</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-optimizer_2.10</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_2.10</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>4.11</version>
-		</dependency>
-
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scalatest</groupId>
-			<artifactId>scalatest_${scala.binary.version}</artifactId>
-			<scope>compile</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-test</artifactId>
-			<version>${curator.version}</version>
-			<scope>compile</scope>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.1.4</version>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies 
-						on scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
-
-					<!-- Run scala compiler in the process-test-resources phase, so that 
-						dependencies on scala classes can be resolved later in the (Java) test-compile 
-						phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- Scala Code Style, most of the configuration done via plugin management -->
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<configuration>
-					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
-				</configuration>
-			</plugin>
-			<!--Plugin to manage test as test-jar apart from the main source code -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-
-		</plugins>
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>
-											net.alchim31.maven
-										</groupId>
-										<artifactId>
-											scala-maven-plugin
-										</artifactId>
-										<versionRange>
-											[3.1.4,)
-										</versionRange>
-										<goals>
-											<goal>compile</goal>
-											<goal>testCompile</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore></ignore>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-test-utils/src/test/java/org/apache/flink/test/testdata/ConnectedComponentsData.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/ConnectedComponentsData.java b/flink-test-utils/src/test/java/org/apache/flink/test/testdata/ConnectedComponentsData.java
deleted file mode 100644
index 60cd319..0000000
--- a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/ConnectedComponentsData.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.testdata;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-import java.util.regex.Pattern;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Assert;
-
-
-public class ConnectedComponentsData {
-
-	public static final String getEnumeratingVertices(int num) {
-		if (num < 1 || num > 1000000) {
-			throw new IllegalArgumentException();
-		}
-
-		StringBuilder bld = new StringBuilder(3 * num);
-		for (int i = 1; i <= num; i++) {
-			bld.append(i);
-			bld.append('\n');
-		}
-		return bld.toString();
-	}
-
-	/**
-	 * Creates random edges such that even numbered vertices are connected with even numbered vertices
-	 * and odd numbered vertices only with other odd numbered ones.
-	 */
-	public static final String getRandomOddEvenEdges(int numEdges, int numVertices, long seed) {
-		if (numVertices < 2 || numVertices > 1000000 || numEdges < numVertices || numEdges > 1000000) {
-			throw new IllegalArgumentException();
-		}
-
-		StringBuilder bld = new StringBuilder(5 * numEdges);
-
-		// first create the linear edge sequence even -> even and odd -> odd to make sure they are
-		// all in the same component
-		for (int i = 3; i <= numVertices; i++) {
-			bld.append(i - 2).append(' ').append(i).append('\n');
-		}
-
-		numEdges -= numVertices - 2;
-		Random r = new Random(seed);
-
-		for (int i = 1; i <= numEdges; i++) {
-			int evenOdd = r.nextBoolean() ? 1 : 0;
-
-			int source = r.nextInt(numVertices) + 1;
-			if (source % 2 != evenOdd) {
-				source--;
-				if (source < 1) {
-					source = 2;
-				}
-			}
-
-			int target = r.nextInt(numVertices) + 1;
-			if (target % 2 != evenOdd) {
-				target--;
-				if (target < 1) {
-					target = 2;
-				}
-			}
-
-			bld.append(source).append(' ').append(target).append('\n');
-		}
-		return bld.toString();
-	}
-
-	public static void checkOddEvenResult(BufferedReader result) throws IOException {
-		Pattern split = Pattern.compile(" ");
-		String line;
-		while ((line = result.readLine()) != null) {
-			String[] res = split.split(line);
-			Assert.assertEquals("Malformed result: Wrong number of tokens in line.", 2, res.length);
-			try {
-				int vertex = Integer.parseInt(res[0]);
-				int component = Integer.parseInt(res[1]);
-
-				int should = vertex % 2;
-				if (should == 0) {
-					should = 2;
-				}
-				Assert.assertEquals("Vertex is in wrong component.", should, component);
-			} catch (NumberFormatException e) {
-				Assert.fail("Malformed result.");
-			}
-		}
-	}
-	
-	public static void checkOddEvenResult(List<Tuple2<Long, Long>> lines) throws IOException {
-		for (Tuple2<Long, Long> line : lines) {
-			try {
-				long vertex = line.f0;
-				long component = line.f1;
-				long should = vertex % 2;
-				if (should == 0) {
-					should = 2;
-				}
-				Assert.assertEquals("Vertex is in wrong component.", should, component);
-			} catch (NumberFormatException e) {
-				Assert.fail("Malformed result.");
-			}
-		}
-	}
-
-	private ConnectedComponentsData() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-test-utils/src/test/java/org/apache/flink/test/testdata/EnumTriangleData.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/EnumTriangleData.java b/flink-test-utils/src/test/java/org/apache/flink/test/testdata/EnumTriangleData.java
deleted file mode 100644
index b170404..0000000
--- a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/EnumTriangleData.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.testdata;
-
-public class EnumTriangleData {
-
-	public static final String EDGES = 
-			"1 2\n" +
-			"1 3\n" +
-			"1 4\n" +
-			"1 5\n" +
-			"2 3\n" +
-			"2 5\n" +
-			"3 4\n" +
-			"3 7\n" +
-			"5 6\n" +
-			"3 8\n" +
-			"7 8\n";
-
-	public static final String TRIANGLES_BY_ID = 
-			"1,2,3\n" +
-			"1,3,4\n" +
-			"1,2,5\n" +
-			"3,7,8\n"; 
-	
-	public static final String TRIANGLES_BY_DEGREE = 
-			"2,1,3\n" +
-			"4,1,3\n" +
-			"2,1,5\n" +
-			"7,3,8\n";
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-test-utils/src/test/java/org/apache/flink/test/testdata/KMeansData.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/KMeansData.java b/flink-test-utils/src/test/java/org/apache/flink/test/testdata/KMeansData.java
deleted file mode 100644
index 1319e84..0000000
--- a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/KMeansData.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.testdata;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.StringTokenizer;
-
-import org.junit.Assert;
-
-
-public class KMeansData {
-
-	// --------------------------------------------------------------------------------------------
-	//  3-dimensional data
-	// --------------------------------------------------------------------------------------------
-	
-	public final static String DATAPOINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n"
-			+ "3|1.60|70.11|16.32|\n" + "4|2.43|19.81|89.56|\n" + "5|67.99|9.00|14.48|\n" + "6|87.80|84.49|55.83|\n"
-			+ "7|90.26|42.99|53.29|\n" + "8|51.36|6.16|9.35|\n" + "9|12.43|9.52|12.54|\n" + "10|80.01|8.78|29.74|\n"
-			+ "11|92.76|2.93|80.07|\n" + "12|46.32|100.00|22.98|\n" + "13|34.11|45.61|58.60|\n"
-			+ "14|68.82|16.36|96.60|\n" + "15|81.47|76.45|28.40|\n" + "16|65.55|40.21|43.43|\n"
-			+ "17|84.22|88.56|13.31|\n" + "18|36.99|68.36|57.12|\n" + "19|28.87|37.69|91.04|\n"
-			+ "20|31.56|13.22|86.00|\n" + "21|18.49|34.45|54.52|\n" + "22|13.33|94.02|92.07|\n"
-			+ "23|91.19|81.62|55.06|\n" + "24|85.78|39.02|25.58|\n" + "25|94.41|47.07|78.23|\n"
-			+ "26|90.62|10.43|80.20|\n" + "27|31.52|85.81|39.79|\n" + "28|24.65|77.98|26.35|\n"
-			+ "29|69.34|75.79|63.96|\n" + "30|22.56|78.61|66.66|\n" + "31|91.74|83.82|73.92|\n"
-			+ "32|76.64|89.53|44.66|\n" + "33|36.02|73.01|92.32|\n" + "34|87.86|18.94|10.74|\n"
-			+ "35|91.94|34.61|5.20|\n" + "36|12.52|47.01|95.29|\n" + "37|44.01|26.19|78.50|\n"
-			+ "38|26.20|73.36|10.08|\n" + "39|15.21|17.37|54.33|\n" + "40|27.96|94.81|44.41|\n"
-			+ "41|26.44|44.81|70.88|\n" + "42|53.29|26.69|2.40|\n" + "43|23.94|11.50|1.71|\n"
-			+ "44|19.00|25.48|50.80|\n" + "45|82.26|1.88|58.08|\n" + "46|47.56|82.54|82.73|\n"
-			+ "47|51.54|35.10|32.95|\n" + "48|86.71|55.51|19.08|\n" + "49|54.16|23.68|32.41|\n"
-			+ "50|71.81|32.83|46.66|\n" + "51|20.70|14.19|64.96|\n" + "52|57.17|88.56|55.23|\n"
-			+ "53|91.39|49.38|70.55|\n" + "54|47.90|62.07|76.03|\n" + "55|55.70|37.77|30.15|\n"
-			+ "56|87.87|74.62|25.95|\n" + "57|95.70|45.04|15.27|\n" + "58|41.61|89.37|24.45|\n"
-			+ "59|82.19|20.84|11.13|\n" + "60|49.88|2.62|18.62|\n" + "61|16.42|53.30|74.13|\n"
-			+ "62|38.37|72.62|35.16|\n" + "63|43.26|49.59|92.56|\n" + "64|28.96|2.36|78.49|\n"
-			+ "65|88.41|91.43|92.55|\n" + "66|98.61|79.58|33.03|\n" + "67|4.94|18.65|30.78|\n"
-			+ "68|75.89|79.30|63.90|\n" + "69|93.18|76.26|9.50|\n" + "70|73.43|70.50|76.49|\n"
-			+ "71|78.64|90.87|34.49|\n" + "72|58.47|63.07|8.82|\n" + "73|69.74|54.36|64.43|\n"
-			+ "74|38.47|36.60|33.39|\n" + "75|51.07|14.75|2.54|\n" + "76|24.18|16.85|15.00|\n"
-			+ "77|7.56|50.72|93.45|\n" + "78|64.28|97.01|57.31|\n" + "79|85.30|24.13|76.57|\n"
-			+ "80|72.78|30.78|13.11|\n" + "81|18.42|17.45|32.20|\n" + "82|87.44|74.98|87.90|\n"
-			+ "83|38.30|17.77|37.33|\n" + "84|63.62|7.90|34.23|\n" + "85|8.84|67.87|30.65|\n"
-			+ "86|76.12|51.83|80.12|\n" + "87|32.30|74.79|4.39|\n" + "88|41.73|45.34|18.66|\n"
-			+ "89|58.13|18.43|83.38|\n" + "90|98.10|33.46|83.07|\n" + "91|17.76|4.10|88.51|\n"
-			+ "92|60.58|18.15|59.96|\n" + "93|50.11|33.25|85.64|\n" + "94|97.74|60.93|38.97|\n"
-			+ "95|76.31|52.50|95.43|\n" + "96|7.71|85.85|36.26|\n" + "97|9.32|72.21|42.17|\n"
-			+ "98|71.29|51.88|57.62|\n" + "99|31.39|7.27|88.74|";
-
-	public static final String INITIAL_CENTERS =
-			"0|1.96|65.04|20.82|\n" +
-			"1|53.99|84.23|81.59|\n" +
-			"2|97.28|74.50|40.32|\n" +
-			"3|63.57|24.53|87.07|\n" +
-			"4|28.10|43.27|86.53|\n" +
-			"5|99.51|62.70|64.48|\n" +
-			"6|30.31|30.36|80.46|";
-
-	public static final String CENTERS_AFTER_ONE_STEP =
-			"0|28.47|54.80|21.88|\n" +
-			"1|52.74|80.10|73.03|\n" +
-			"2|83.92|60.45|25.17|\n" +
-			"3|70.73|20.18|67.06|\n" +
-			"4|22.51|47.19|86.23|\n" +
-			"5|82.70|53.79|68.68|\n" +
-			"6|29.74|19.17|59.16|";
-	
-
-	public static final String CENTERS_AFTER_ONE_STEP_SINGLE_DIGIT =
-			"0|28.5|54.8|21.9|\n" +
-			"1|52.7|80.1|73.0|\n" +
-			"2|83.9|60.5|25.2|\n" +
-			"3|70.7|20.2|67.0|\n" +
-			"4|22.5|47.2|86.2|\n" +
-			"5|82.7|53.8|68.7|\n" +
-			"6|29.7|19.2|59.2|";
-	
-	
-	public static final String CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT =
-			"0|38.3|54.5|19.3|\n" +
-			"1|32.1|83.0|50.4|\n" +
-			"2|87.5|56.6|20.3|\n" +
-			"3|75.4|18.6|67.5|\n" +
-			"4|24.9|29.2|77.6|\n" +
-			"5|78.7|66.1|70.8|\n" +
-			"6|39.5|14.0|18.7|\n";
-	
-	public static final String CENTERS_AFTER_20_ITERATIONS_DOUBLE_DIGIT =
-			"0|38.25|54.52|19.34|\n" +
-			"1|32.14|83.04|50.35|\n" +
-			"2|87.48|56.57|20.27|\n" +
-			"3|75.40|18.65|67.49|\n" +
-			"4|24.93|29.25|77.56|\n" +
-			"5|78.67|66.07|70.82|\n" +
-			"6|39.51|14.04|18.74|\n";
-	
-	// --------------------------------------------------------------------------------------------
-	//  2-dimensional data
-	// --------------------------------------------------------------------------------------------
-	
-	public final static String DATAPOINTS_2D = "0|50.90|16.20|\n" + "1|73.65|61.76|\n" + "2|61.73|49.95|\n"
-			+ "3|1.60|70.11|\n"   + "4|2.43|19.81|\n"   + "5|67.99|9.00|\n" + "6|87.80|84.49|\n"
-			+ "7|90.26|42.99|\n"  + "8|51.36|6.16|\n"   + "9|12.43|9.52|\n" + "10|80.01|8.78|\n"
-			+ "11|92.76|2.93|\n"  + "12|46.32|100.00|\n"+ "13|34.11|45.61|\n"
-			+ "14|68.82|16.36|\n" + "15|81.47|76.45|\n" + "16|65.55|40.21|\n"
-			+ "17|84.22|88.56|\n" + "18|36.99|68.36|\n" + "19|28.87|37.69|\n"
-			+ "20|31.56|13.22|\n" + "21|18.49|34.45|\n" + "22|13.33|94.02|\n"
-			+ "23|91.19|81.62|\n" + "24|85.78|39.02|\n" + "25|94.41|47.07|\n"
-			+ "26|90.62|10.43|\n" + "27|31.52|85.81|\n" + "28|24.65|77.98|\n"
-			+ "29|69.34|75.79|\n" + "30|22.56|78.61|\n" + "31|91.74|83.82|\n"
-			+ "32|76.64|89.53|\n" + "33|36.02|73.01|\n" + "34|87.86|18.94|\n"
-			+ "35|91.94|34.61|\n" + "36|12.52|47.01|\n" + "37|44.01|26.19|\n"
-			+ "38|26.20|73.36|\n" + "39|15.21|17.37|\n" + "40|27.96|94.81|\n"
-			+ "41|26.44|44.81|\n" + "42|53.29|26.69|\n" + "43|23.94|11.50|n"
-			+ "44|19.00|25.48|\n" + "45|82.26|1.88|\n"  + "46|47.56|82.54|\n"
-			+ "47|51.54|35.10|\n" + "48|86.71|55.51|\n" + "49|54.16|23.68|\n"
-			+ "50|71.81|32.83|\n" + "51|20.70|14.19|\n" + "52|57.17|88.56|\n"
-			+ "53|91.39|49.38|\n" + "54|47.90|62.07|\n" + "55|55.70|37.77|\n"
-			+ "56|87.87|74.62|\n" + "57|95.70|45.04|\n" + "58|41.61|89.37|\n"
-			+ "59|82.19|20.84|\n" + "60|49.88|2.62|\n"  + "61|16.42|53.30|\n"
-			+ "62|38.37|72.62|\n" + "63|43.26|49.59|\n" + "64|28.96|2.36|\n"
-			+ "65|88.41|91.43|\n" + "66|98.61|79.58|\n" + "67|4.94|18.65|\n"
-			+ "68|75.89|79.30|\n" + "69|93.18|76.26|\n" + "70|73.43|70.50|\n"
-			+ "71|78.64|90.87|\n" + "72|58.47|63.07|\n" + "73|69.74|54.36|\n"
-			+ "74|38.47|36.60|\n" + "75|51.07|14.75|\n" + "76|24.18|16.85|\n"
-			+ "77|7.56|50.72|\n"  + "78|64.28|97.01|\n" + "79|85.30|24.13|\n"
-			+ "80|72.78|30.78|\n" + "81|18.42|17.45|\n" + "82|87.44|74.98|\n"
-			+ "83|38.30|17.77|\n" + "84|63.62|7.90|\n"  + "85|8.84|67.87|\n"
-			+ "86|76.12|51.83|\n" + "87|32.30|74.79|n"  + "88|41.73|45.34|\n"
-			+ "89|58.13|18.43|\n" + "90|98.10|33.46|\n" + "91|17.76|4.10|\n"
-			+ "92|60.58|18.15|\n" + "93|50.11|33.25|\n" + "94|97.74|60.93|\n"
-			+ "95|76.31|52.50|\n" + "96|7.71|85.85|\n"  + "97|9.32|72.21|\n"
-			+ "98|71.29|51.88|\n" + "99|31.39|7.27|";
-	
-	public static final String INITIAL_CENTERS_2D =
-			"0|1.96|65.04|\n" +
-			"1|53.99|84.23|\n" +
-			"2|97.28|74.50|\n" +
-			"3|63.57|24.53|\n" +
-			"4|28.10|43.27|\n" +
-			"5|99.51|62.70|\n" +
-			"6|30.31|30.36|";
-
-	public static final String CENTERS_2D_AFTER_SINGLE_ITERATION_DOUBLE_DIGIT =
-			"0|13.53|74.53|\n" +
-			"1|49.12|80.49|\n" +
-			"2|87.20|81.83|\n" +
-			"3|67.39|23.32|\n" +
-			"4|26.94|46.34|\n" +
-			"5|88.39|48.64|\n" +
-			"6|23.20|16.71|";
-	
-	public static final String CENTERS_2D_AFTER_20_ITERATIONS_DOUBLE_DIGIT =
-			"0|15.80|79.42|\n" +
-			"1|43.11|78.20|\n" +
-			"2|83.13|82.18|\n" +
-			"3|65.70|18.49|\n" +
-			"4|25.13|44.42|\n" +
-			"5|82.90|48.16|\n" +
-			"6|20.79|13.08|";
-	
-	// --------------------------------------------------------------------------------------------
-	//  testing / verification
-	// --------------------------------------------------------------------------------------------
-	
-	public static void checkResultsWithDelta(String expectedResults, List<String> resultLines, final double maxDelta) {
-		
-		Comparator<String> deltaComp = new Comparator<String>() {
-
-			@Override
-			public int compare(String o1, String o2) {
-				
-				StringTokenizer st1 = new StringTokenizer(o1, "|");
-				StringTokenizer st2 = new StringTokenizer(o2, "|");
-				
-				if(st1.countTokens() != st2.countTokens()) {
-					return st1.countTokens() - st2.countTokens();
-				}
-				
-				// first token is ID
-				String t1 = st1.nextToken();
-				String t2 = st2.nextToken();
-				if(!t1.equals(t2)) {
-					return t1.compareTo(t2);
-				}
-				
-				while(st1.hasMoreTokens()) {
-					t1 = st1.nextToken();
-					t2 = st2.nextToken();
-					
-					double d1 = Double.parseDouble(t1);
-					double d2 = Double.parseDouble(t2);
-					
-					if (Math.abs(d1-d2) > maxDelta) {
-						return d1 < d2 ? -1 : 1;
-					}
-				}
-				
-				return 0;
-			}
-		};
-		
-		// ------- Test results -----------
-		
-		Collections.sort(resultLines, deltaComp);
-		
-		final String[] should = expectedResults.split("\n");
-		final String[] is = (String[]) resultLines.toArray(new String[resultLines.size()]);
-		
-		Assert.assertEquals("Wrong number of result lines.", should.length, is.length);
-		
-		for (int i = 0; i < should.length; i++) {
-			StringTokenizer shouldRecord = new StringTokenizer(should[i], "|");
-			StringTokenizer isRecord = new StringTokenizer(is[i], "|");
-			
-			Assert.assertEquals("Records don't match.", shouldRecord.countTokens(), isRecord.countTokens());
-			
-			// first token is ID
-			String shouldToken = shouldRecord.nextToken();
-			String isToken = isRecord.nextToken();
-			
-			Assert.assertEquals("Records don't match.", shouldToken, isToken);
-
-			while (shouldRecord.hasMoreTokens()) {
-				shouldToken = shouldRecord.nextToken();
-				isToken = isRecord.nextToken();
-				
-				double shouldDouble = Double.parseDouble(shouldToken);
-				double isDouble = Double.parseDouble(isToken);
-				
-				Assert.assertTrue("Value " + isDouble + " is out of range of " + shouldDouble + " +/- " + maxDelta, shouldDouble - maxDelta <= isDouble && shouldDouble + maxDelta >= isDouble);
-			}
-		}
-	}
-	
-	
-	private KMeansData() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-test-utils/src/test/java/org/apache/flink/test/testdata/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/PageRankData.java b/flink-test-utils/src/test/java/org/apache/flink/test/testdata/PageRankData.java
deleted file mode 100644
index f91e611..0000000
--- a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/PageRankData.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.testdata;
-
-public class PageRankData {
-
-	public static final int NUM_VERTICES = 5;
-	
-	public static final String VERTICES = 	"1\n" +
-											"2\n" +
-											"5\n" +
-											"3\n" +
-											"4";
-	
-	public static final String EDGES = "2 1\n" +
-										"5 2\n" + 
-										"5 4\n" +
-										"4 3\n" +
-										"4 2\n" +
-										"1 4\n" +
-										"1 2\n" +
-										"1 3\n" +
-										"3 5\n";
-
-	
-	public static final String RANKS_AFTER_3_ITERATIONS = "1 0.237\n" +
-														"2 0.248\n" + 
-														"3 0.173\n" +
-														"4 0.175\n" +
-														"5 0.165";
-
-	
-	public static final String RANKS_AFTER_EPSILON_0_0001_CONVERGENCE = "1 0.238\n" +
-																		"2 0.244\n" +
-																		"3 0.170\n" +
-																		"4 0.171\n" +
-																		"5 0.174";
-	
-	private PageRankData() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-test-utils/src/test/java/org/apache/flink/test/testdata/TransitiveClosureData.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/TransitiveClosureData.java b/flink-test-utils/src/test/java/org/apache/flink/test/testdata/TransitiveClosureData.java
deleted file mode 100644
index 652b195..0000000
--- a/flink-test-utils/src/test/java/org/apache/flink/test/testdata/TransitiveClosureData.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.testdata;
-
-import org.junit.Assert;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.util.regex.Pattern;
-
-public class TransitiveClosureData {
-
-	public static void checkOddEvenResult(BufferedReader result) throws IOException {
-		Pattern split = Pattern.compile(" ");
-		String line;
-		while ((line = result.readLine()) != null) {
-			String[] res = split.split(line);
-			Assert.assertEquals("Malformed result: Wrong number of tokens in line.", 2, res.length);
-			try {
-				int from = Integer.parseInt(res[0]);
-				int to = Integer.parseInt(res[1]);
-
-				Assert.assertEquals("Vertex should not be reachable.", from % 2, to % 2);
-			} catch (NumberFormatException e) {
-				Assert.fail("Malformed result.");
-			}
-		}
-	}
-
-	private TransitiveClosureData() {}
-}


Mime
View raw message