flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] flink git commit: [FLINK-1605] Bundle all hadoop dependencies and shade guava away
Date Wed, 11 Mar 2015 11:00:56 GMT
Repository: flink
Updated Branches:
  refs/heads/master 2522f028b -> 84e76f4d3


http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
new file mode 100644
index 0000000..5976799
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test starts a MiniYARNCluster with a FIFO scheudler.
+ * There are no queues for that scheduler.
+ */
+public class YARNSessionFIFOITCase extends YarnTestBase {
+	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
+
+	/*
+	Override init with FIFO scheduler.
+	 */
+	@BeforeClass
+	public static void setup() {
+		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
+		yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
+		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+		startYARNWithConfig(yarnConfiguration);
+	}
+	/**
+	 * Test regular operation, including command line parameter parsing.
+	 */
+	@Test
+	public void testClientStartup() {
+		LOG.info("Starting testClientStartup()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+						"-n", "1",
+						"-jm", "512",
+						"-tm", "1024"},
+				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+		LOG.info("Finished testClientStartup()");
+		ensureNoExceptionsInLogFiles();
+	}
+
+
+	/**
+	 * Test querying the YARN cluster.
+	 *
+	 * This test validates through 666*2 cores in the "cluster".
+	 */
+	@Test
+	public void testQueryCluster() {
+		LOG.info("Starting testQueryCluster()");
+		runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores.
+		LOG.info("Finished testQueryCluster()");
+		ensureNoExceptionsInLogFiles();
+	}
+
+	/**
+	 * Test deployment to non-existing queue. (user-reported error)
+	 * Deployment to the queue is possible because there are no queues, so we don't check.
+	 */
+	@Test
+	public void testNonexistingQueue() {
+		LOG.info("Starting testNonexistingQueue()");
+		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "1",
+				"-jm", "512",
+				"-tm", "1024",
+				"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+		LOG.info("Finished testNonexistingQueue()");
+		ensureNoExceptionsInLogFiles();
+	}
+
+	/**
+	 * Test requesting more resources than available.
+	 */
+	@Test
+	public void testMoreNodesThanAvailable() {
+		if(ignoreOnTravis()) {
+			return;
+		}
+		addTestAppender();
+		LOG.info("Starting testMoreNodesThanAvailable()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "10",
+				"-jm", "512",
+				"-tm", "1024"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); // the number of TMs depends on the speed of the test hardware
+		LOG.info("Finished testMoreNodesThanAvailable()");
+		checkForLogString("This YARN session requires 10752MB of memory in the cluster. There are currently only 8192MB available.");
+		ensureNoExceptionsInLogFiles();
+	}
+
+	/**
+	 * The test cluster has the following resources:
+	 * - 2 Nodes with 4096 MB each.
+	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+	 *
+	 * We allocate:
+	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
+	 * 5 TaskManagers with 1585 MB
+	 *
+	 * user sees a total request of: 8181 MB (fits)
+	 * system sees a total request of: 8437 (doesn't fit due to min alloc mb)
+	 */
+	@Test
+	public void testResourceComputation() {
+		if(ignoreOnTravis()) {
+			return;
+		}
+		addTestAppender();
+		LOG.info("Starting testResourceComputation()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "5",
+				"-jm", "256",
+				"-tm", "1585"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION);
+		LOG.info("Finished testResourceComputation()");
+		checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available.");
+	}
+
+	/**
+	 * The test cluster has the following resources:
+	 * - 2 Nodes with 4096 MB each.
+	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+	 *
+	 * We allocate:
+	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
+	 * 2 TaskManagers with 3840 MB
+	 *
+	 * the user sees a total request of: 7936 MB (fits)
+	 * the system sees a request of: 8192 MB (fits)
+	 * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit.
+	 *
+	 * --> check if the system properly rejects allocating this session.
+	 */
+	@Test
+	public void testfullAlloc() {
+		if(ignoreOnTravis()) {
+			return;
+		}
+		addTestAppender();
+		LOG.info("Starting testfullAlloc()");
+		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+				"-n", "2",
+				"-jm", "256",
+				"-tm", "3840"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION);
+		LOG.info("Finished testfullAlloc()");
+		checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" +
+				"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
+		ensureNoExceptionsInLogFiles();
+	}
+
+	/**
+	 * Test per-job yarn cluster
+	 *
+	 * This also tests the prefixed CliFrontend options for the YARN case
+	 */
+	@Test
+	public void perJobYarnCluster() {
+		LOG.info("Starting perJobYarnCluster()");
+		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName("-WordCount.jar", "streaming")); // exclude streaming wordcount here.
+		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
+		runWithArgs(new String[] {"run", "-m", "yarn-cluster",
+				"-yj", flinkUberjar.getAbsolutePath(),
+				"-yn", "1",
+				"-yjm", "512",
+				"-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status FINISHED.", RunTypes.CLI_FRONTEND);
+		LOG.info("Finished perJobYarnCluster()");
+		ensureNoExceptionsInLogFiles();
+	}
+
+	/**
+	 * Test the YARN Java API
+	 */
+	@Test
+	public void testJavaAPI() {
+		final int WAIT_TIME = 15;
+		LOG.info("Starting testJavaAPI()");
+
+		AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
+		flinkYarnClient.setTaskManagerCount(1);
+		flinkYarnClient.setJobManagerMemory(512);
+		flinkYarnClient.setTaskManagerMemory(512);
+		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+		String confDirPath = System.getenv("FLINK_CONF_DIR");
+		flinkYarnClient.setConfigurationDirectory(confDirPath);
+		flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
+		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
+
+		// deploy
+		AbstractFlinkYarnCluster yarnCluster = null;
+		try {
+			yarnCluster = flinkYarnClient.deploy(null);
+		} catch (Exception e) {
+			System.err.println("Error while deploying YARN cluster: "+e.getMessage());
+			e.printStackTrace(System.err);
+			Assert.fail();
+		}
+		FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1);
+		for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
+			try {
+				Thread.sleep(1000);
+			} catch (InterruptedException e) {
+				LOG.warn("Interrupted", e);
+				Thread.interrupted();
+			}
+			FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
+			if(status != null && status.equals(expectedStatus)) {
+				LOG.info("Cluster reached status " + status);
+				break; // all good, cluster started
+			}
+			if(second > WAIT_TIME) {
+				// we waited for 15 seconds. cluster didn't come up correctly
+				Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds");
+			}
+		}
+
+		// use the cluster
+		Assert.assertNotNull(yarnCluster.getJobManagerAddress());
+		Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
+
+		LOG.info("Shutting down cluster. All tests passed");
+		// shutdown cluster
+		yarnCluster.shutdown();
+		LOG.info("Finished testJavaAPI()");
+
+		ensureNoExceptionsInLogFiles();
+	}
+
+	public boolean ignoreOnTravis() {
+		if(System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true")) {
+			// we skip the test until we are able to start a smaller yarn clsuter
+			// right now, the miniyarncluster has the size of the nodemanagers fixed on 4 GBs.
+			LOG.warn("Skipping test on travis for now");
+			return true;
+		}
+		return false;
+	}
+
+	//
+	// --------------- Tools to test if a certain string has been logged with Log4j. -------------
+	// See :  http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
+	//
+	private static TestAppender testAppender;
+	public static void addTestAppender() {
+		testAppender = new TestAppender();
+		org.apache.log4j.Logger.getRootLogger().addAppender(testAppender);
+	}
+
+	public static void checkForLogString(String expected) {
+		if(testAppender == null) {
+			throw new NullPointerException("Initialize it first");
+		}
+		LoggingEvent found = null;
+		for(LoggingEvent event: testAppender.events) {
+			if(event.getMessage().toString().contains(expected)) {
+				found = event;
+				break;
+			}
+		}
+		if(found != null) {
+			LOG.info("Found expected string '"+expected+"' in log message "+found);
+			return;
+		}
+		Assert.fail("Unable to find expected string '"+expected+"' in log messages");
+	}
+
+	public static class TestAppender extends AppenderSkeleton {
+		public List<LoggingEvent> events = new ArrayList<LoggingEvent>();
+		public void close() {}
+		public boolean requiresLayout() {return false;}
+		@Override
+		protected void append(LoggingEvent event) {
+			events.add(event);
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
new file mode 100644
index 0000000..200205d
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ */
+public abstract class YarnTestBase {
+	private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class);
+
+	private final static PrintStream originalStdout = System.out;
+	private final static PrintStream originalStderr = System.err;
+
+	private final static String TEST_CLUSTER_NAME = "flink-yarn-tests";
+
+
+	// Temp directory which is deleted after the unit test.
+	private static TemporaryFolder tmp = new TemporaryFolder();
+
+	protected static MiniYARNCluster yarnCluster = null;
+
+	protected static File flinkUberjar;
+	private static File yarnConfFile;
+
+	protected static final Configuration yarnConfiguration;
+	static {
+		yarnConfiguration = new YarnConfiguration();
+		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways
+		yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+		yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
+		yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+		yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+		yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+		yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+		yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
+		// so we have to change the number of cores for testing.
+	}
+
+	// This code is taken from: http://stackoverflow.com/a/7201825/568695
+	// it changes the environment variables of this JVM. Use only for testing purposes!
+	@SuppressWarnings("unchecked")
+	private static void setEnv(Map<String, String> newenv) {
+		try {
+			Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
+			Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
+			theEnvironmentField.setAccessible(true);
+			Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
+			env.putAll(newenv);
+			Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+			theCaseInsensitiveEnvironmentField.setAccessible(true);
+			Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
+			cienv.putAll(newenv);
+		} catch (NoSuchFieldException e) {
+			try {
+				Class[] classes = Collections.class.getDeclaredClasses();
+				Map<String, String> env = System.getenv();
+				for (Class cl : classes) {
+					if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+						Field field = cl.getDeclaredField("m");
+						field.setAccessible(true);
+						Object obj = field.get(env);
+						Map<String, String> map = (Map<String, String>) obj;
+						map.clear();
+						map.putAll(newenv);
+					}
+				}
+			} catch (Exception e2) {
+				throw new RuntimeException(e2);
+			}
+		} catch (Exception e1) {
+			throw new RuntimeException(e1);
+		}
+	}
+
+	/**
+	 * Sleep a bit between the tests (we are re-using the YARN cluster for the tests)
+	 */
+	@After
+	public void sleep() {
+		try {
+			Thread.sleep(500);
+		} catch (InterruptedException e) {
+			Assert.fail("Should not happen");
+		}
+	}
+
+	@Before
+	public void checkClusterEmpty() throws IOException, YarnException {
+		YarnClient yarnClient = YarnClient.createYarnClient();
+		yarnClient.init(yarnConfiguration);
+		yarnClient.start();
+		List<ApplicationReport> apps = yarnClient.getApplications();
+		for(ApplicationReport app : apps) {
+			if(app.getYarnApplicationState() != YarnApplicationState.FINISHED) {
+				Assert.fail("There is at least one application on the cluster is not finished");
+			}
+		}
+	}
+
+	/**
+	 * Locate a file or diretory directory
+	 */
+	public static File findFile(String startAt, FilenameFilter fnf) {
+		File root = new File(startAt);
+		String[] files = root.list();
+		if(files == null) {
+			return null;
+		}
+		for(String file : files) {
+
+			File f = new File(startAt + File.separator + file);
+			if(f.isDirectory()) {
+				File r = findFile(f.getAbsolutePath(), fnf);
+				if(r != null) {
+					return r;
+				}
+			} else if (fnf.accept(f.getParentFile(), f.getName())) {
+				return f;
+			}
+
+		}
+		return null;
+	}
+
+	/**
+	 * Filter to find root dir of the flink-yarn dist.
+	 */
+	public static class RootDirFilenameFilter implements FilenameFilter {
+		@Override
+		public boolean accept(File dir, String name) {
+			return name.startsWith("flink-dist") && name.endsWith(".jar") && dir.toString().contains("/lib");
+		}
+	}
+	public static class ContainsName implements FilenameFilter {
+		private String name;
+		private String excludeInPath = null;
+
+		public ContainsName(String name) {
+			this.name = name;
+		}
+
+		public ContainsName(String name, String excludeInPath) {
+			this.name = name;
+			this.excludeInPath = excludeInPath;
+		}
+
+		@Override
+		public boolean accept(File dir, String name) {
+			if(excludeInPath == null) {
+				return name.contains(this.name);
+			} else {
+				return name.contains(this.name) && !dir.toString().contains(excludeInPath);
+			}
+		}
+	}
+
+	public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException {
+		tmp.create();
+		File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
+
+		FileWriter writer = new FileWriter(yarnSiteXML);
+		yarnConf.writeXml(writer);
+		writer.flush();
+		writer.close();
+		return yarnSiteXML;
+	}
+
+	/**
+	 * This method checks the written TaskManager and JobManager log files
+	 * for exceptions.
+	 */
+	public static void ensureNoExceptionsInLogFiles() {
+		File cwd = new File("target/"+TEST_CLUSTER_NAME);
+		Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists());
+		Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
+		System.out.println("cwd = "+cwd.getAbsolutePath());
+		File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
+			@Override
+			public boolean accept(File dir, String name) {
+				File f = new File(dir.getAbsolutePath()+ "/" + name);
+				// scan each file for 'Exception'.
+				Scanner scanner =  null;
+				try {
+					scanner = new Scanner(f);
+				} catch (FileNotFoundException e) {
+					LOG.warn("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath());
+				}
+				while (scanner.hasNextLine()) {
+					final String lineFromFile = scanner.nextLine();
+					if(lineFromFile.contains("Exception")) {
+						return true;
+					}
+				}
+				return false;
+			}
+		});
+		if(foundFile != null) {
+			Scanner scanner =  null;
+			try {
+				scanner = new Scanner(foundFile);
+			} catch (FileNotFoundException e) {
+				Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath());
+			}
+			LOG.warn("Found a file with an exception. Printing contents:");
+			while (scanner.hasNextLine()) {
+				LOG.warn("LINE: "+scanner.nextLine());
+			}
+			Assert.fail("Found a file "+foundFile+" with an exception");
+		}
+	}
+
+	public static void startYARNWithConfig(Configuration conf) {
+		flinkUberjar = findFile("..", new RootDirFilenameFilter());
+		Assert.assertNotNull(flinkUberjar);
+		String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
+
+		if (!flinkUberjar.exists()) {
+			Assert.fail("Unable to locate yarn-uberjar.jar");
+		}
+
+		try {
+			LOG.info("Starting up MiniYARN cluster");
+			if (yarnCluster == null) {
+				yarnCluster = new MiniYARNCluster(TEST_CLUSTER_NAME, 2, 1, 1);
+
+				yarnCluster.init(conf);
+				yarnCluster.start();
+			}
+
+			Map<String, String> map = new HashMap<String, String>(System.getenv());
+			File flinkConfFilePath = findFile(flinkDistRootDir, new ContainsName("flink-conf.yaml"));
+			Assert.assertNotNull(flinkConfFilePath);
+			map.put("FLINK_CONF_DIR", flinkConfFilePath.getParent());
+			yarnConfFile = writeYarnSiteConfigXML(conf);
+			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
+			map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos
+			setEnv(map);
+
+			Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
+		} catch (Exception ex) {
+			ex.printStackTrace();
+			LOG.error("setup failure", ex);
+			Assert.fail();
+		}
+	}
+
+	/**
+	 * Default @BeforeClass impl. Overwrite this for passing a different configuration
+	 */
+	@BeforeClass
+	public static void setup() {
+		startYARNWithConfig(yarnConfiguration);
+	}
+
+	// -------------------------- Runner -------------------------- //
+
+	private static ByteArrayOutputStream outContent;
+	private static ByteArrayOutputStream errContent;
+	enum RunTypes {
+		YARN_SESSION, CLI_FRONTEND
+	}
+
+	protected void runWithArgs(String[] args, String expect, RunTypes type) {
+		LOG.info("Running with args {}", Arrays.toString(args));
+
+		outContent = new ByteArrayOutputStream();
+		errContent = new ByteArrayOutputStream();
+		System.setOut(new PrintStream(outContent));
+		System.setErr(new PrintStream(errContent));
+
+
+		final int START_TIMEOUT_SECONDS = 60;
+
+		Runner runner = new Runner(args, type);
+		runner.start();
+
+		boolean expectedStringSeen = false;
+		for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
+			try {
+				Thread.sleep(1000);
+			} catch (InterruptedException e) {
+				Assert.fail("Interruption not expected");
+			}
+			// check output for correct TaskManager startup.
+			if(outContent.toString().contains(expect)
+					|| errContent.toString().contains(expect) ) {
+				expectedStringSeen = true;
+				LOG.info("Found expected output in redirected streams");
+				// send "stop" command to command line interface
+				runner.sendStop();
+				// wait for the thread to stop
+				try {
+					runner.join(1000);
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while stopping runner", e);
+				}
+				LOG.warn("stopped");
+				break;
+			}
+			// check if thread died
+			if(!runner.isAlive()) {
+				sendOutput();
+				Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue());
+			}
+		}
+
+		sendOutput();
+		Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " +
+				"expected string did not show up", expectedStringSeen);
+		LOG.info("Test was successful");
+	}
+
+	private static void sendOutput() {
+		System.setOut(originalStdout);
+		System.setErr(originalStderr);
+
+		LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString());
+		LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString());
+	}
+
+	public static class Runner extends Thread {
+		private final String[] args;
+		private int returnValue;
+		private RunTypes type;
+		private FlinkYarnSessionCli yCli;
+
+		public Runner(String[] args, RunTypes type) {
+			this.args = args;
+			this.type = type;
+		}
+
+		public int getReturnValue() {
+			return returnValue;
+		}
+
+		@Override
+		public void run() {
+			switch(type) {
+				case YARN_SESSION:
+					yCli = new FlinkYarnSessionCli("", "");
+					returnValue = yCli.run(args);
+					break;
+				case CLI_FRONTEND:
+					try {
+						CliFrontend cli = new CliFrontend();
+						returnValue = cli.parseParameters(args);
+					} catch (Exception e) {
+						throw new RuntimeException(e);
+					}
+					break;
+				default:
+					throw new RuntimeException("Unknown type " + type);
+			}
+
+			if(returnValue != 0) {
+				Assert.fail("The YARN session returned with non-null value="+returnValue);
+			}
+		}
+
+		public void sendStop() {
+			if(yCli != null) {
+				yCli.stop();
+			}
+		}
+	}
+
+	// -------------------------- Tear down -------------------------- //
+
+	@AfterClass
+	public static void tearDown() {
+		//shutdown YARN cluster
+		if (yarnCluster != null) {
+			LOG.info("shutdown MiniYarn cluster");
+			yarnCluster.stop();
+			yarnCluster = null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/resources/log4j-test.properties b/flink-yarn-tests/src/main/resources/log4j-test.properties
new file mode 100644
index 0000000..b4dbbe0
--- /dev/null
+++ b/flink-yarn-tests/src/main/resources/log4j-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=WARN, file
+
+# Log all infos in the given file
+log4j.appender.file=org.apache.log4j.ConsoleAppender
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
deleted file mode 100644
index 9fd2541..0000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.List;
-
-public class UtilsTest {
-
-	@Test
-	public void testUberjarLocator() {
-		File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter());
-		Assert.assertNotNull(dir);
-		dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
-		Assert.assertTrue(dir.exists());
-		Assert.assertTrue(dir.isDirectory());
-		Assert.assertTrue(dir.toString().contains("flink-dist"));
-		List<String> files = Arrays.asList(dir.list());
-		Assert.assertTrue(files.contains("lib"));
-		Assert.assertTrue(files.contains("bin"));
-		Assert.assertTrue(files.contains("conf"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
deleted file mode 100644
index 7da355b..0000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.flink.yarn.YARNSessionFIFOITCase.addTestAppender;
-import static org.apache.flink.yarn.YARNSessionFIFOITCase.checkForLogString;
-
-
-/**
- * This test starts a MiniYARNCluster with a CapacityScheduler.
- * Is has, by default a queue called "default". The configuration here adds another queue: "qa-team".
- */
-public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
-	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
-
-	@BeforeClass
-	public static void setup() {
-		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
-		yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
-		yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
-		yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
-		startYARNWithConfig(yarnConfiguration);
-	}
-
-	/**
-	 * Test regular operation, including command line parameter parsing.
-	 */
-	@Test
-	public void testClientStartup() {
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-						"-n", "1",
-						"-jm", "512",
-						"-tm", "1024", "-qu", "qa-team"},
-				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
-
-		ensureNoExceptionsInLogFiles();
-	}
-
-
-	/**
-	 * Test deployment to non-existing queue. (user-reported error)
-	 * Deployment to the queue is possible because there are no queues, so we don't check.
-	 */
-	@Test
-	public void testNonexistingQueue() {
-		addTestAppender();
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-				"-n", "1",
-				"-jm", "512",
-				"-tm", "1024",
-				"-qu", "doesntExist"}, "to unknown queue: doesntExist", RunTypes.YARN_SESSION);
-		checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team");
-
-		ensureNoExceptionsInLogFiles();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
deleted file mode 100644
index d5f301b..0000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test starts a MiniYARNCluster with a FIFO scheudler.
- * There are no queues for that scheduler.
- */
-public class YARNSessionFIFOITCase extends YarnTestBase {
-	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
-
-	/*
-	Override init with FIFO scheduler.
-	 */
-	@BeforeClass
-	public static void setup() {
-		yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
-		yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
-		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
-		startYARNWithConfig(yarnConfiguration);
-	}
-	/**
-	 * Test regular operation, including command line parameter parsing.
-	 */
-	@Test
-	public void testClientStartup() {
-		LOG.info("Starting testClientStartup()");
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-						"-n", "1",
-						"-jm", "512",
-						"-tm", "1024"},
-				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
-		LOG.info("Finished testClientStartup()");
-		ensureNoExceptionsInLogFiles();
-	}
-
-
-	/**
-	 * Test querying the YARN cluster.
-	 *
-	 * This test validates through 666*2 cores in the "cluster".
-	 */
-	@Test
-	public void testQueryCluster() {
-		LOG.info("Starting testQueryCluster()");
-		runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores.
-		LOG.info("Finished testQueryCluster()");
-		ensureNoExceptionsInLogFiles();
-	}
-
-	/**
-	 * Test deployment to non-existing queue. (user-reported error)
-	 * Deployment to the queue is possible because there are no queues, so we don't check.
-	 */
-	@Test
-	public void testNonexistingQueue() {
-		LOG.info("Starting testNonexistingQueue()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
-				"-n", "1",
-				"-jm", "512",
-				"-tm", "1024",
-				"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
-		LOG.info("Finished testNonexistingQueue()");
-		ensureNoExceptionsInLogFiles();
-	}
-
-	/**
-	 * Test requesting more resources than available.
-	 */
-	@Test
-	public void testMoreNodesThanAvailable() {
-		if(ignoreOnTravis()) {
-			return;
-		}
-		addTestAppender();
-		LOG.info("Starting testMoreNodesThanAvailable()");
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-				"-n", "10",
-				"-jm", "512",
-				"-tm", "1024"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); // the number of TMs depends on the speed of the test hardware
-		LOG.info("Finished testMoreNodesThanAvailable()");
-		checkForLogString("This YARN session requires 10752MB of memory in the cluster. There are currently only 8192MB available.");
-		ensureNoExceptionsInLogFiles();
-	}
-
-	/**
-	 * The test cluster has the following resources:
-	 * - 2 Nodes with 4096 MB each.
-	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
-	 *
-	 * We allocate:
-	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
-	 * 5 TaskManagers with 1585 MB
-	 *
-	 * user sees a total request of: 8181 MB (fits)
-	 * system sees a total request of: 8437 (doesn't fit due to min alloc mb)
-	 */
-	@Test
-	public void testResourceComputation() {
-		if(ignoreOnTravis()) {
-			return;
-		}
-		addTestAppender();
-		LOG.info("Starting testResourceComputation()");
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-				"-n", "5",
-				"-jm", "256",
-				"-tm", "1585"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION);
-		LOG.info("Finished testResourceComputation()");
-		checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available.");
-	}
-
-	/**
-	 * The test cluster has the following resources:
-	 * - 2 Nodes with 4096 MB each.
-	 * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
-	 *
-	 * We allocate:
-	 * 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
-	 * 2 TaskManagers with 3840 MB
-	 *
-	 * the user sees a total request of: 7936 MB (fits)
-	 * the system sees a request of: 8192 MB (fits)
-	 * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit.
-	 *
-	 * --> check if the system properly rejects allocating this session.
-	 */
-	@Test
-	public void testfullAlloc() {
-		if(ignoreOnTravis()) {
-			return;
-		}
-		addTestAppender();
-		LOG.info("Starting testfullAlloc()");
-		runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-				"-n", "2",
-				"-jm", "256",
-				"-tm", "3840"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION);
-		LOG.info("Finished testfullAlloc()");
-		checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" +
-				"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
-		ensureNoExceptionsInLogFiles();
-	}
-
-	/**
-	 * Test per-job yarn cluster
-	 *
-	 * This also tests the prefixed CliFrontend options for the YARN case
-	 */
-	@Test
-	public void perJobYarnCluster() {
-		LOG.info("Starting perJobYarnCluster()");
-		File exampleJarLocation = YarnTestBase.findFile(".", new ContainsName("-WordCount.jar", "streaming")); // exclude streaming wordcount here.
-		runWithArgs(new String[] {"run", "-m", "yarn-cluster",
-				"-yj", flinkUberjar.getAbsolutePath(),
-				"-yn", "1",
-				"-yjm", "512",
-				"-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status FINISHED.", RunTypes.CLI_FRONTEND);
-		LOG.info("Finished perJobYarnCluster()");
-		ensureNoExceptionsInLogFiles();
-	}
-
-	/**
-	 * Test the YARN Java API
-	 */
-	@Test
-	public void testJavaAPI() {
-		final int WAIT_TIME = 15;
-		LOG.info("Starting testJavaAPI()");
-
-		AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
-		flinkYarnClient.setTaskManagerCount(1);
-		flinkYarnClient.setJobManagerMemory(512);
-		flinkYarnClient.setTaskManagerMemory(512);
-		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		String confDirPath = System.getenv("FLINK_CONF_DIR");
-		flinkYarnClient.setConfigurationDirectory(confDirPath);
-		flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
-		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
-
-		// deploy
-		AbstractFlinkYarnCluster yarnCluster = null;
-		try {
-			yarnCluster = flinkYarnClient.deploy(null);
-		} catch (Exception e) {
-			System.err.println("Error while deploying YARN cluster: "+e.getMessage());
-			e.printStackTrace(System.err);
-			Assert.fail();
-		}
-		FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1);
-		for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
-			try {
-				Thread.sleep(1000);
-			} catch (InterruptedException e) {
-				LOG.warn("Interrupted", e);
-				Thread.interrupted();
-			}
-			FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
-			if(status != null && status.equals(expectedStatus)) {
-				LOG.info("Cluster reached status " + status);
-				break; // all good, cluster started
-			}
-			if(second > WAIT_TIME) {
-				// we waited for 15 seconds. cluster didn't come up correctly
-				Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds");
-			}
-		}
-
-		// use the cluster
-		Assert.assertNotNull(yarnCluster.getJobManagerAddress());
-		Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
-
-		LOG.info("Shutting down cluster. All tests passed");
-		// shutdown cluster
-		yarnCluster.shutdown();
-		LOG.info("Finished testJavaAPI()");
-
-		ensureNoExceptionsInLogFiles();
-	}
-
-	public boolean ignoreOnTravis() {
-		if(System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true")) {
-			// we skip the test until we are able to start a smaller yarn clsuter
-			// right now, the miniyarncluster has the size of the nodemanagers fixed on 4 GBs.
-			LOG.warn("Skipping test on travis for now");
-			return true;
-		}
-		return false;
-	}
-
-	//
-	// --------------- Tools to test if a certain string has been logged with Log4j. -------------
-	// See :  http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
-	//
-	private static TestAppender testAppender;
-	public static void addTestAppender() {
-		testAppender = new TestAppender();
-		org.apache.log4j.Logger.getRootLogger().addAppender(testAppender);
-	}
-
-	public static void checkForLogString(String expected) {
-		if(testAppender == null) {
-			throw new NullPointerException("Initialize it first");
-		}
-		LoggingEvent found = null;
-		for(LoggingEvent event: testAppender.events) {
-			if(event.getMessage().toString().contains(expected)) {
-				found = event;
-				break;
-			}
-		}
-		if(found != null) {
-			LOG.info("Found expected string '"+expected+"' in log message "+found);
-			return;
-		}
-		Assert.fail("Unable to find expected string '"+expected+"' in log messages");
-	}
-
-	public static class TestAppender extends AppenderSkeleton {
-		public List<LoggingEvent> events = new ArrayList<LoggingEvent>();
-		public void close() {}
-		public boolean requiresLayout() {return false;}
-		@Override
-		protected void append(LoggingEvent event) {
-			events.add(event);
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
deleted file mode 100644
index 65517d3..0000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Scanner;
-
-
-/**
- * This base class allows to use the MiniYARNCluster.
- * The cluster is re-used for all tests.
- *
- * This class is located in a different package which is build after flink-dist. This way,
- * we can use the YARN uberjar of flink to start a Flink YARN session.
- */
-public abstract class YarnTestBase {
-	private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class);
-
-	private final static PrintStream originalStdout = System.out;
-	private final static PrintStream originalStderr = System.err;
-
-	private final static String TEST_CLUSTER_NAME = "flink-yarn-tests";
-
-
-	// Temp directory which is deleted after the unit test.
-	private static TemporaryFolder tmp = new TemporaryFolder();
-
-	protected static MiniYARNCluster yarnCluster = null;
-
-	protected static File flinkUberjar;
-	private static File yarnConfFile;
-
-	protected static final Configuration yarnConfiguration;
-	static {
-		yarnConfiguration = new YarnConfiguration();
-		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
-		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways
-		yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
-		yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
-		yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-		yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
-		yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
-		yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
-		yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
-		// so we have to change the number of cores for testing.
-	}
-
-	// This code is taken from: http://stackoverflow.com/a/7201825/568695
-	// it changes the environment variables of this JVM. Use only for testing purposes!
-	@SuppressWarnings("unchecked")
-	private static void setEnv(Map<String, String> newenv) {
-		try {
-			Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
-			Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
-			theEnvironmentField.setAccessible(true);
-			Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
-			env.putAll(newenv);
-			Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
-			theCaseInsensitiveEnvironmentField.setAccessible(true);
-			Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
-			cienv.putAll(newenv);
-		} catch (NoSuchFieldException e) {
-			try {
-				Class[] classes = Collections.class.getDeclaredClasses();
-				Map<String, String> env = System.getenv();
-				for (Class cl : classes) {
-					if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
-						Field field = cl.getDeclaredField("m");
-						field.setAccessible(true);
-						Object obj = field.get(env);
-						Map<String, String> map = (Map<String, String>) obj;
-						map.clear();
-						map.putAll(newenv);
-					}
-				}
-			} catch (Exception e2) {
-				throw new RuntimeException(e2);
-			}
-		} catch (Exception e1) {
-			throw new RuntimeException(e1);
-		}
-	}
-
-	/**
-	 * Sleep a bit between the tests (we are re-using the YARN cluster for the tests)
-	 */
-	@After
-	public void sleep() {
-		try {
-			Thread.sleep(500);
-		} catch (InterruptedException e) {
-			Assert.fail("Should not happen");
-		}
-	}
-
-	@Before
-	public void checkClusterEmpty() throws IOException, YarnException {
-		YarnClient yarnClient = YarnClient.createYarnClient();
-		yarnClient.init(yarnConfiguration);
-		yarnClient.start();
-		List<ApplicationReport> apps = yarnClient.getApplications();
-		for(ApplicationReport app : apps) {
-			if(app.getYarnApplicationState() != YarnApplicationState.FINISHED) {
-				Assert.fail("There is at least one application on the cluster is not finished");
-			}
-		}
-	}
-
-	/**
-	 * Locate a file or diretory directory
-	 */
-	public static File findFile(String startAt, FilenameFilter fnf) {
-		File root = new File(startAt);
-		String[] files = root.list();
-		if(files == null) {
-			return null;
-		}
-		for(String file : files) {
-
-			File f = new File(startAt + File.separator + file);
-			if(f.isDirectory()) {
-				File r = findFile(f.getAbsolutePath(), fnf);
-				if(r != null) {
-					return r;
-				}
-			} else if (fnf.accept(f.getParentFile(), f.getName())) {
-				return f;
-			}
-
-		}
-		return null;
-	}
-
-	/**
-	 * Filter to find root dir of the flink-yarn dist.
-	 */
-	public static class RootDirFilenameFilter implements FilenameFilter {
-		@Override
-		public boolean accept(File dir, String name) {
-			return name.endsWith("yarn-uberjar.jar") && dir.toString().contains("/lib");
-		}
-	}
-	public static class ContainsName implements FilenameFilter {
-		private String name;
-		private String excludeInPath = null;
-
-		public ContainsName(String name) {
-			this.name = name;
-		}
-
-		public ContainsName(String name, String excludeInPath) {
-			this.name = name;
-			this.excludeInPath = excludeInPath;
-		}
-
-		@Override
-		public boolean accept(File dir, String name) {
-			if(excludeInPath == null) {
-				return name.contains(this.name);
-			} else {
-				return name.contains(this.name) && !dir.toString().contains(excludeInPath);
-			}
-		}
-	}
-
-	public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException {
-		tmp.create();
-		File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
-
-		FileWriter writer = new FileWriter(yarnSiteXML);
-		yarnConf.writeXml(writer);
-		writer.flush();
-		writer.close();
-		return yarnSiteXML;
-	}
-
-	/**
-	 * This method checks the written TaskManager and JobManager log files
-	 * for exceptions.
-	 */
-	public static void ensureNoExceptionsInLogFiles() {
-		File cwd = new File("target/"+TEST_CLUSTER_NAME);
-		Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists());
-		Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
-		System.out.println("cwd = "+cwd.getAbsolutePath());
-		File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
-			@Override
-			public boolean accept(File dir, String name) {
-				File f = new File(dir.getAbsolutePath()+ "/" + name);
-				// scan each file for 'Exception'.
-				Scanner scanner =  null;
-				try {
-					scanner = new Scanner(f);
-				} catch (FileNotFoundException e) {
-					Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath());
-				}
-				while (scanner.hasNextLine()) {
-					final String lineFromFile = scanner.nextLine();
-					if(lineFromFile.contains("Exception")) {
-						return true;
-					}
-				}
-				return false;
-			}
-		});
-		if(foundFile != null) {
-			Scanner scanner =  null;
-			try {
-				scanner = new Scanner(foundFile);
-			} catch (FileNotFoundException e) {
-				Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath());
-			}
-			LOG.warn("Found a file with an exception. Printing contents:");
-			while (scanner.hasNextLine()) {
-				LOG.warn("LINE: "+scanner.nextLine());
-			}
-			Assert.fail("Found a file "+foundFile+" with an exception");
-		}
-	}
-
-	public static void main(String[] args) {
-		ensureNoExceptionsInLogFiles();
-	}
-
-	public static void startYARNWithConfig(Configuration conf) {
-		flinkUberjar = findFile(".", new RootDirFilenameFilter());
-		Assert.assertNotNull(flinkUberjar);
-		String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
-
-		if (!flinkUberjar.exists()) {
-			Assert.fail("Unable to locate yarn-uberjar.jar");
-		}
-
-		try {
-			LOG.info("Starting up MiniYARN cluster");
-			if (yarnCluster == null) {
-				yarnCluster = new MiniYARNCluster(TEST_CLUSTER_NAME, 2, 1, 1);
-
-				yarnCluster.init(conf);
-				yarnCluster.start();
-			}
-
-			Map<String, String> map = new HashMap<String, String>(System.getenv());
-			File flinkConfFilePath = findFile(flinkDistRootDir, new ContainsName("flink-conf.yaml"));
-			Assert.assertNotNull(flinkConfFilePath);
-			map.put("FLINK_CONF_DIR", flinkConfFilePath.getParent());
-			yarnConfFile = writeYarnSiteConfigXML(conf);
-			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
-			map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos
-			setEnv(map);
-
-			Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
-		} catch (Exception ex) {
-			ex.printStackTrace();
-			LOG.error("setup failure", ex);
-			Assert.fail();
-		}
-	}
-
-	/**
-	 * Default @BeforeClass impl. Overwrite this for passing a different configuration
-	 */
-	@BeforeClass
-	public static void setup() {
-		startYARNWithConfig(yarnConfiguration);
-	}
-
-	// -------------------------- Runner -------------------------- //
-
-	private static ByteArrayOutputStream outContent;
-	private static ByteArrayOutputStream errContent;
-	enum RunTypes {
-		YARN_SESSION, CLI_FRONTEND
-	}
-
-	protected void runWithArgs(String[] args, String expect, RunTypes type) {
-		LOG.info("Running with args {}", Arrays.toString(args));
-
-		outContent = new ByteArrayOutputStream();
-		errContent = new ByteArrayOutputStream();
-		System.setOut(new PrintStream(outContent));
-		System.setErr(new PrintStream(errContent));
-
-
-		final int START_TIMEOUT_SECONDS = 60;
-
-		Runner runner = new Runner(args, type);
-		runner.start();
-
-		boolean expectedStringSeen = false;
-		for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
-			try {
-				Thread.sleep(1000);
-			} catch (InterruptedException e) {
-				Assert.fail("Interruption not expected");
-			}
-			// check output for correct TaskManager startup.
-			if(outContent.toString().contains(expect)
-					|| errContent.toString().contains(expect) ) {
-				expectedStringSeen = true;
-				LOG.info("Found expected output in redirected streams");
-				// send "stop" command to command line interface
-				runner.sendStop();
-				// wait for the thread to stop
-				try {
-					runner.join(1000);
-				} catch (InterruptedException e) {
-					LOG.warn("Interrupted while stopping runner", e);
-				}
-				LOG.warn("stopped");
-				break;
-			}
-			// check if thread died
-			if(!runner.isAlive()) {
-				sendOutput();
-				Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue());
-			}
-		}
-
-		sendOutput();
-		Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " +
-				"expected string did not show up", expectedStringSeen);
-		LOG.info("Test was successful");
-	}
-
-	private static void sendOutput() {
-		System.setOut(originalStdout);
-		System.setErr(originalStderr);
-
-		LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString());
-		LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString());
-	}
-
-	public static class Runner extends Thread {
-		private final String[] args;
-		private int returnValue;
-		private RunTypes type;
-		private FlinkYarnSessionCli yCli;
-
-		public Runner(String[] args, RunTypes type) {
-			this.args = args;
-			this.type = type;
-		}
-
-		public int getReturnValue() {
-			return returnValue;
-		}
-
-		@Override
-		public void run() {
-			switch(type) {
-				case YARN_SESSION:
-					yCli = new FlinkYarnSessionCli("", "");
-					returnValue = yCli.run(args);
-					break;
-				case CLI_FRONTEND:
-					try {
-						CliFrontend cli = new CliFrontend();
-						returnValue = cli.parseParameters(args);
-					} catch (Exception e) {
-						throw new RuntimeException(e);
-					}
-					break;
-				default:
-					throw new RuntimeException("Unknown type " + type);
-			}
-
-			if(returnValue != 0) {
-				Assert.fail("The YARN session returned with non-null value="+returnValue);
-			}
-		}
-
-		public void sendStop() {
-			if(yCli != null) {
-				yCli.stop();
-			}
-		}
-	}
-
-	// -------------------------- Tear down -------------------------- //
-
-	@AfterClass
-	public static void tearDown() {
-		//shutdown YARN cluster
-		if (yarnCluster != null) {
-			LOG.info("shutdown MiniYarn cluster");
-			yarnCluster.stop();
-			yarnCluster = null;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties
deleted file mode 100644
index b4dbbe0..0000000
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=WARN, file
-
-# Log all infos in the given file
-log4j.appender.file=org.apache.log4j.ConsoleAppender
-log4j.appender.file.append=false
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 1569f15..805543e 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -51,6 +51,12 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>${shading-artifact.name}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
 			<groupId>com.typesafe.akka</groupId>
 			<artifactId>akka-actor_2.10</artifactId>
 		</dependency>
@@ -66,38 +72,12 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.camel</groupId>
-			<artifactId>camel-stream</artifactId>
-			<version>2.14.0</version>
-		</dependency>
-
-		<!--  guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
-		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
-			<scope>provided</scope>
 		</dependency>
 		
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-yarn-client</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-common</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-hdfs</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-mapreduce-client-core</artifactId>
-		</dependency>
+		
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 5c57292..16cb345 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -238,6 +239,15 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 
 	@Override
 	public void setShipFiles(List<File> shipFiles) {
+		File shipFile;
+		for(Iterator<File> it = shipFiles.iterator(); it.hasNext(); ) {
+			shipFile = it.next();
+			// remove uberjar from ship list (by default everything in the lib/ folder is added to
+			// the list of files to ship, but we handle the uberjar separately.
+			if(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar")) {
+				it.remove();
+			}
+		}
 		this.shipFiles.addAll(shipFiles);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1d09d02..cf042ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,7 +52,7 @@ under the License.
 	</scm>
 
 	<modules>
-		<module>flink-shaded</module>
+		<module>flink-shaded-hadoop</module>
 		<module>flink-core</module>
 		<module>flink-java</module>
 		<module>flink-scala</module>
@@ -71,6 +71,7 @@ under the License.
 	<properties>
 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+		<shading-artifact.name>error</shading-artifact.name>
 		<hadoop-one.version>1.2.1</hadoop-one.version>
 		<hadoop-two.version>2.2.0</hadoop-two.version>
 		<scala.version>2.10.4</scala.version>
@@ -81,13 +82,15 @@ under the License.
 		<flink.reuseForks>true</flink.reuseForks>
 		<log4j.configuration>log4j-test.properties</log4j.configuration>
 		<slf4j.version>1.7.7</slf4j.version>
-		<guava.version>17.0</guava.version>
+		<guava.version>18.0</guava.version>
 		<scala.version>2.10.4</scala.version>
 		<akka.version>2.3.7</akka.version>
 		<scala.binary.version>2.10</scala.binary.version>
 		<scala.macros.version>2.0.1</scala.macros.version>
 		<kryoserialization.version>0.3.2</kryoserialization.version>
 		<protobuf.version>2.5.0</protobuf.version>
+		<chill.version>0.5.2</chill.version>
+		<asm.version>4.0</asm.version>
 	</properties>
 
 	<dependencies>
@@ -158,14 +161,16 @@ under the License.
 	
 	<!-- this section defines the module versions that are used if nothing else is specified. -->
 	<dependencyManagement>
+		<!-- WARN: 
+			DO NOT put 	guava, 
+						protobuf, 
+						asm,
+						netty
+					here. It will overwrite Hadoop's guava dependency (even though we handle it
+			separatly in the flink-shaded-hadoop module).
+			We can use all guava versions everywhere by adding it directly as a dependency to each project.
+		-->
 		<dependencies>
-		
-			<!-- ASM is used by us, Kryo, Hadoop, ... -->
-			<dependency>
-				<groupId>org.ow2.asm</groupId>
-				<artifactId>asm</artifactId>
-				<version>4.0</version>
-			</dependency>
 
 			<!-- Make sure we use a consistent jetty version throughout the project -->
 			<dependency>
@@ -207,6 +212,12 @@ under the License.
 				<artifactId>commons-cli</artifactId>
 				<version>1.2</version>
 			</dependency>
+
+			<dependency>
+				<groupId>commons-io</groupId>
+				<artifactId>commons-io</artifactId>
+				<version>2.4</version>
+			</dependency>
 			
 			<!-- common-collections is used by us and by hadoop, so we need to define a common version -->
 			<dependency>
@@ -310,290 +321,6 @@ under the License.
 					</exclusion>
 				</exclusions>
 			</dependency>
-
-
-			<!-- "Old" stable Hadoop = MapReduce v1 -->
-			<dependency>
-				<groupId>org.apache.hadoop</groupId>
-				<artifactId>hadoop-core</artifactId>
-				<version>${hadoop.version}</version>
-				<exclusions>
-					<exclusion>
-						<groupId>asm</groupId>
-						<artifactId>asm</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-compiler</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-runtime</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-api-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty-util</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.eclipse.jdt</groupId>
-						<artifactId>core</artifactId>
-					</exclusion>
-				</exclusions>
-			</dependency>
-
-			<!-- Hadoop 2 Dependencies -->
-			<dependency>
-				<groupId>org.apache.hadoop</groupId>
-				<artifactId>hadoop-common</artifactId>
-				<version>${hadoop.version}</version>
-				<exclusions>
-					<exclusion>
-						<groupId>asm</groupId>
-						<artifactId>asm</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-compiler</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-runtime</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-api-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty-util</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.eclipse.jdt</groupId>
-						<artifactId>core</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>javax.servlet</groupId>
-						<artifactId>servlet-api</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>javax.servlet.jsp</groupId>
-						<artifactId>jsp-api</artifactId>
-					</exclusion>
-				</exclusions>
-			</dependency>
-			<dependency>
-				<groupId>org.apache.hadoop</groupId>
-				<artifactId>hadoop-hdfs</artifactId>
-				<version>${hadoop.version}</version>
-				<exclusions>
-					<exclusion>
-						<groupId>asm</groupId>
-						<artifactId>asm</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-compiler</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-runtime</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-api-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty-util</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.eclipse.jdt</groupId>
-						<artifactId>core</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>javax.servlet</groupId>
-						<artifactId>servlet-api</artifactId>
-					</exclusion>
-				</exclusions>
-			</dependency>
-			<dependency>
-				<groupId>org.apache.hadoop</groupId>
-				<artifactId>hadoop-client</artifactId>
-				<version>${hadoop.version}</version>
-				<exclusions>
-					<exclusion>
-						<groupId>asm</groupId>
-						<artifactId>asm</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-compiler</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-runtime</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-api-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty-util</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.eclipse.jdt</groupId>
-						<artifactId>core</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>javax.servlet</groupId>
-						<artifactId>servlet-api</artifactId>
-					</exclusion>
-				</exclusions>
-			</dependency>
-			<dependency>
-				<groupId>org.apache.hadoop</groupId>
-				<artifactId>hadoop-mapreduce-client-core</artifactId>
-				<version>${hadoop.version}</version>
-				<exclusions>
-					<exclusion>
-						<groupId>asm</groupId>
-						<artifactId>asm</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-compiler</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-runtime</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-api-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty-util</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.eclipse.jdt</groupId>
-						<artifactId>core</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.apache.hadoop</groupId>
-						<artifactId>hadoop-yarn-common</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>com.google.inject.extensions</groupId>
-						<artifactId>guice-servlet</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>io.netty</groupId>
-						<artifactId>netty</artifactId>
-					</exclusion>
-				</exclusions>
-			</dependency>
-			<dependency>
-				<groupId>org.apache.hadoop</groupId>
-				<artifactId>hadoop-yarn-client</artifactId>
-				<version>${hadoop.version}</version>
-				<exclusions>
-					<exclusion>
-						<groupId>asm</groupId>
-						<artifactId>asm</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-compiler</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>tomcat</groupId>
-						<artifactId>jasper-runtime</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-api-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jsp-2.1</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.mortbay.jetty</groupId>
-						<artifactId>jetty-util</artifactId>
-					</exclusion>
-					<exclusion>
-						<groupId>org.eclipse.jdt</groupId>
-						<artifactId>core</artifactId>
-					</exclusion>
-				</exclusions>
-			</dependency>
-			<dependency>
-				<groupId>org.apache.hadoop</groupId>
-				<artifactId>hadoop-yarn-common</artifactId>
-				<version>${hadoop.version}</version>
-			</dependency>
-			<dependency>
-				<groupId>org.apache.hadoop</groupId>
-				<artifactId>hadoop-yarn-server-tests</artifactId>
-				<scope>test</scope>
-				<version>${hadoop.version}</version>
-			</dependency>
-			<dependency>
-				<groupId>org.apache.hadoop</groupId>
-				<artifactId>hadoop-minicluster</artifactId>
-				<scope>test</scope>
-				<version>${hadoop.version}</version>
-			</dependency>
 		</dependencies>
 	</dependencyManagement>
 
@@ -608,6 +335,7 @@ under the License.
 			</activation>
 			<properties>
 				<hadoop.version>${hadoop-one.version}</hadoop.version>
+				<shading-artifact.name>flink-shaded-hadoop1</shading-artifact.name>
 			</properties>
 		</profile>
 		<profile>
@@ -620,6 +348,7 @@ under the License.
 			</activation>
 			<properties>
 				<hadoop.version>${hadoop-two.version}</hadoop.version>
+				<shading-artifact.name>flink-shaded-hadoop2</shading-artifact.name>
 			</properties>
 		</profile>
 
@@ -631,121 +360,14 @@ under the License.
 				<!--hadoop2--><name>!hadoop.profile</name>
 				</property>
 			</activation>
+			<properties>
+				<shading-artifact.name>flink-shaded-include-yarn</shading-artifact.name>
+			</properties>
 			<modules>
 				<module>flink-yarn</module>
 				<module>flink-yarn-tests</module>
 			</modules>
 		</profile>
-		<profile>
-			<id>hadoop-2.0.0-alpha</id>
-			<activation>
-				<property>
-					<name>hadoop.version</name>
-					<value>2.0.0-alpha</value>
-				</property>
-			</activation>
-			<properties>
-				<akka.version>2.2.1</akka.version>
-				<kryoserialization.version>0.3.1</kryoserialization.version>
-				<protobuf.version>2.4.1</protobuf.version>
-			</properties>
-			<dependencyManagement>
-				<dependencies>
-					<dependency>
-						<groupId>org.apache.hadoop</groupId>
-						<artifactId>hadoop-common</artifactId>
-						<version>${hadoop.version}</version>
-						<exclusions>
-							<!-- This is an additional exclusion (Netty) -->
-							<exclusion>
-								<groupId>org.jboss.netty</groupId>
-								<artifactId>netty</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>asm</groupId>
-								<artifactId>asm</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>tomcat</groupId>
-								<artifactId>jasper-compiler</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>tomcat</groupId>
-								<artifactId>jasper-runtime</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>org.mortbay.jetty</groupId>
-								<artifactId>jetty</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>javax.servlet</groupId>
-								<artifactId>servlet-api</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>org.mortbay.jetty</groupId>
-								<artifactId>jsp-api-2.1</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>org.mortbay.jetty</groupId>
-								<artifactId>jsp-2.1</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>org.mortbay.jetty</groupId>
-								<artifactId>jetty-util</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>org.eclipse.jdt</groupId>
-								<artifactId>core</artifactId>
-							</exclusion>
-						</exclusions>
-					</dependency>
-					<dependency>
-						<groupId>org.apache.hadoop</groupId>
-						<artifactId>hadoop-mapreduce-client-core</artifactId>
-						<version>${hadoop.version}</version>
-						<exclusions>
-							<!-- This is an additional exclusion (Netty) -->
-							<exclusion>
-								<groupId>org.jboss.netty</groupId>
-								<artifactId>netty</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>asm</groupId>
-								<artifactId>asm</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>tomcat</groupId>
-								<artifactId>jasper-compiler</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>tomcat</groupId>
-								<artifactId>jasper-runtime</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>org.mortbay.jetty</groupId>
-								<artifactId>jetty</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>org.mortbay.jetty</groupId>
-								<artifactId>jsp-api-2.1</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>org.mortbay.jetty</groupId>
-								<artifactId>jsp-2.1</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>org.mortbay.jetty</groupId>
-								<artifactId>jetty-util</artifactId>
-							</exclusion>
-							<exclusion>
-								<groupId>org.eclipse.jdt</groupId>
-								<artifactId>core</artifactId>
-							</exclusion>
-						</exclusions>
-					</dependency>
-				</dependencies>
-			</dependencyManagement>
-		</profile>
 
 		<profile>
 			<id>vendor-repos</id>
@@ -896,29 +518,6 @@ under the License.
 		</profile>
 	</profiles>
 
-	<reporting>
-		<plugins>
-			<!-- execution of Unit Tests -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-report-plugin</artifactId>
-				<version>2.17</version>
-			</plugin>
-
-			<!-- test coverage reports -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>cobertura-maven-plugin</artifactId>
-				<version>2.6</version>
-				<configuration>
-					<formats>
-						<format>html</format>
-					</formats>
-				</configuration>
-			</plugin>
-		</plugins>
-	</reporting>
-
 	<build>
 		<plugins>
 			<plugin>
@@ -935,38 +534,6 @@ under the License.
 				</configuration>
 			</plugin>
 			
-			<!-- Relocate references to Google Guava classes into a different namespace -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<version>2.3</version>
-				<executions>
-					<execution>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<shadedArtifactAttached>false</shadedArtifactAttached>
-							<createDependencyReducedPom>false</createDependencyReducedPom>
-							<artifactSet>
-								<includes>
-									<include>org.apache.flink:${project.artifact}</include>
-								</includes>
-							</artifactSet>
-							<relocations>
-								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
-									<excludes>
-										<exclude>com.google.protobuf.**</exclude>
-									</excludes>
-								</relocation>
-							</relocations>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
 			<plugin>
 				<groupId>org.apache.rat</groupId>
 				<artifactId>apache-rat-plugin</artifactId>
@@ -1159,6 +726,54 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<!-- We use shading in all packages for relocating some classes, such as
+				Guava and ASM.
+				By doing so, users adding Flink as a dependency won't run into conflicts.
+				(For example users can use whatever guava version they want, because we don't
+				expose our guava dependency)
+			-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>2.3</version>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadeTestJar>true</shadeTestJar>
+							<shadedArtifactAttached>false</shadedArtifactAttached>
+							<createDependencyReducedPom>true</createDependencyReducedPom>
+							<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+							<artifactSet>
+								<includes>
+									<include>com.google.guava:*</include>
+									<include>org.ow2.asm:*</include>
+								</includes>
+							</artifactSet>
+							<relocations>
+								<relocation>
+									<pattern>com.google</pattern>
+									<shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+									<excludes>
+										<exclude>com.google.protobuf.**</exclude>
+										<exclude>com.google.inject.**</exclude>
+									</excludes>
+								</relocation>
+								<relocation>
+									<pattern>org.objectweb.asm</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
 		</plugins>
 
 		<!--

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index d94f977..0875a7e 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -45,7 +45,7 @@ LOG4J_PROPERTIES=${HERE}/log4j-travis.properties
 
 # Maven command to run. We set the forkCount manually, because otherwise Maven sees too many cores
 # on the Travis VMs.
-MVN="mvn -Dflink.forkCount=2 -B $PROFILE -Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES clean install verify"
+MVN="mvn -Dflink.forkCount=2 -B $PROFILE -Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES clean install"
 
 MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid"
 MVN_EXIT="${ARTIFACTS_DIR}/watchdog.mvn.exit"


Mime
View raw message