flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/9] flink git commit: [streaming] [storm] Clean up instantiation of mini clusters and test environments.
Date Thu, 01 Oct 2015 09:54:14 GMT
[streaming] [storm] Clean up instantiation of mini clusters and test environments.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82d62361
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82d62361
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82d62361

Branch: refs/heads/master
Commit: 82d6236173093b7e035a21360c7b69c67fd6ae62
Parents: 891db5e
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Sep 30 00:12:00 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Oct 1 11:02:39 2015 +0200

----------------------------------------------------------------------
 .../OperatorStatsAccumulatorTest.java           |  11 +-
 .../api/FlinkLocalCluster.java                  |  88 +++++--
 .../excamation/StormExclamationLocal.java       |   9 +-
 .../api/FlinkTestCluster.java                   | 107 --------
 .../stormcompatibility/api/StormTestBase.java   | 117 +++++++++
 .../ExclamationWithStormBoltITCase.java         |   4 +-
 .../ExclamationWithStormSpoutITCase.java        |   5 +-
 .../StormExclamationLocalITCase.java            |   7 +-
 .../wordcount/BoltTokenizerWordCountITCase.java |   4 +-
 .../BoltTokenizerWordCountPojoITCase.java       |   4 +-
 .../BoltTokenizerWordCountWithNamesITCase.java  |   4 +-
 .../wordcount/SpoutSourceWordCountITCase.java   |   4 +-
 .../wordcount/StormWordCountLocalITCase.java    |   7 +-
 .../StormWordCountLocalNamedITCase.java         |   7 +-
 .../flink/api/java/ExecutionEnvironment.java    |   2 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   7 +
 .../java/org/apache/flink/tachyon/HDFSTest.java |  26 +-
 .../api/java/ScalaShellRemoteEnvironment.java   |  10 +-
 .../org.apache.flink/api/scala/FlinkILoop.scala |   9 +-
 .../api/environment/LocalStreamEnvironment.java |  78 ++++--
 .../environment/RemoteStreamEnvironment.java    |  52 +++-
 .../environment/StreamExecutionEnvironment.java | 249 +++++++++++--------
 .../flink/streaming/util/ClusterUtil.java       |  96 -------
 .../util/StreamingMultipleProgramsTestBase.java |  29 +--
 .../util/StreamingProgramTestBase.java          |  58 ++---
 .../streaming/util/TestStreamEnvironment.java   | 150 +++--------
 .../TopSpeedWindowingExampleITCase.java         |   2 +-
 .../flink/tez/client/RemoteTezEnvironment.java  |   2 +-
 .../flink/tez/test/TezProgramTestBase.java      |   3 +-
 .../flink/test/util/AbstractTestBase.java       |  33 ++-
 .../flink/test/util/JavaProgramTestBase.java    |   3 +-
 .../flink/test/util/RecordAPITestBase.java      |   8 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   9 +-
 .../ExecutionEnvironmentITCase.java             |  25 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java |   2 +-
 .../BatchScalaAPICompletenessTest.scala         |   3 +-
 36 files changed, 620 insertions(+), 614 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
index b9a8dc2..887c745 100644
--- a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
+++ b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
@@ -36,6 +37,7 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.Random;
 
+@SuppressWarnings("serial")
 public class OperatorStatsAccumulatorTest extends AbstractTestBase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(OperatorStatsAccumulatorTest.class);
@@ -43,9 +45,9 @@ public class OperatorStatsAccumulatorTest extends AbstractTestBase {
 	private static final String ACCUMULATOR_NAME = "op-stats";
 
 	public OperatorStatsAccumulatorTest(){
-		super(new Configuration());
+		super(new Configuration(), StreamingMode.BATCH_ONLY);
 	}
-
+	
 	public static class StringToInt extends RichFlatMapFunction<String, Tuple1<Integer>> {
 
 		// Is instantiated later since the runtime context is not yet initialized
@@ -81,9 +83,8 @@ public class OperatorStatsAccumulatorTest extends AbstractTestBase {
 			try {
 				intValue = Integer.parseInt(value);
 				localAccumulator.add(intValue);
-				out.collect(new Tuple1(intValue));
-			} catch (NumberFormatException ex) {
-			}
+				out.collect(new Tuple1<>(intValue));
+			} catch (NumberFormatException ignored) {}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
index b5eda8b..c139201 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
@@ -25,15 +25,40 @@ import backtype.storm.generated.RebalanceOptions;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.generated.SubmitOptions;
 import backtype.storm.generated.TopologyInfo;
-import org.apache.flink.streaming.util.ClusterUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
  */
 public class FlinkLocalCluster {
 
+	/** The log used by this mini cluster */
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
+	
+	/** The flink mini cluster on which to execute the programs */
+	private final FlinkMiniCluster flink;
+
+	
+	public FlinkLocalCluster() {
+		this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING);
+		this.flink.start();
+	}
+
+	public FlinkLocalCluster(FlinkMiniCluster flink) {
+		this.flink = Objects.requireNonNull(flink);
+	}
+
 	public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
 			throws Exception {
 		this.submitTopologyWithOpts(topologyName, conf, topology, null);
@@ -41,7 +66,10 @@ public class FlinkLocalCluster {
 
 	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
 			final SubmitOptions submitOpts) throws Exception {
-		ClusterUtil.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
+		
+		LOG.info("Running Storm topology on FlinkLocalCluster");
+		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
+		flink.submitJobDetached(jobGraph);
 	}
 
 	public void killTopology(final String topologyName) {
@@ -60,7 +88,9 @@ public class FlinkLocalCluster {
 	public void rebalance(final String name, final RebalanceOptions options) {
 	}
 
-	public void shutdown() {}
+	public void shutdown() {
+		flink.stop();
+	}
 
 	public String getTopologyConf(final String id) {
 		return null;
@@ -82,31 +112,57 @@ public class FlinkLocalCluster {
 		return null;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Access to default local cluster
+	// ------------------------------------------------------------------------
+	
 	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
-	private static FlinkLocalCluster currentCluster = null;
+	private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();
 
 	/**
-	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by {@link
-	 * #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
+	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
+	 * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
 	 *
 	 * @return a {@link FlinkLocalCluster} to be used for execution
 	 */
 	public static FlinkLocalCluster getLocalCluster() {
-		if (currentCluster == null) {
-			currentCluster = new FlinkLocalCluster();
-		}
-
-		return currentCluster;
+		return currentFactory.createLocalCluster();
 	}
 
 	/**
-	 * Sets a different {@link FlinkLocalCluster} to be used for execution.
+	 * Sets a different factory for FlinkLocalClusters to be used for execution.
 	 *
-	 * @param cluster
-	 * 		the {@link FlinkLocalCluster} to be used for execution
+	 * @param clusterFactory
+	 * 		The LocalClusterFactory to create the local clusters for execution.
 	 */
-	public static void initialize(final FlinkLocalCluster cluster) {
-		currentCluster = cluster;
+	public static void initialize(LocalClusterFactory clusterFactory) {
+		currentFactory = Objects.requireNonNull(clusterFactory);
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Cluster factory
+	// ------------------------------------------------------------------------
 
+	/**
+	 * A factory that creates local clusters.
+	 */
+	public static interface LocalClusterFactory {
+
+		/**
+		 * Creates a local flink cluster.
+		 * @return A local flink cluster.
+		 */
+		FlinkLocalCluster createLocalCluster();
+	}
+
+	/**
+	 * A factory that instantiates a FlinkLocalCluster.
+	 */
+	public static class DefaultLocalClusterFactory implements LocalClusterFactory {
+		
+		@Override
+		public FlinkLocalCluster createLocalCluster() {
+			return new FlinkLocalCluster();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
index 5941ff0..bd1220c 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
@@ -23,18 +23,19 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 
 /**
  * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
- * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
- * submitted to Flink for execution in the same way as to a Storm {@link LocalCluster}.
+ * files in a streaming fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}
+ * and submitted to Flink for execution in the same way as to a Storm {@link backtype.storm.LocalCluster}.
  * <p/>
  * This example shows how to run program directly within Java, thus it cannot be used to submit a
- * {@link StormTopology} via Flink command line clients (ie, bin/flink).
+ * {@link backtype.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink).
  * <p/>
  * <p/>
  * The input is a plain text file with lines separated by newline characters.
  * <p/>
  * <p/>
  * Usage: <code>StormExclamationLocal &lt;text path&gt; &lt;result path&gt;</code><br/>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
  * <p/>
  * <p/>
  * This example shows how to:

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
deleted file mode 100644
index 68f1216..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyInfo;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-
-import java.util.Map;
-
-/**
- * {@link FlinkTestCluster} mimics a Storm {@link LocalCluster} for ITCases via a {@link TestStreamEnvironment}.
- */
-public class FlinkTestCluster extends FlinkLocalCluster {
-
-	@Override
-	public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
-			throws Exception {
-		this.submitTopologyWithOpts(topologyName, conf, topology, null);
-	}
-
-	@Override
-	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
-			final SubmitOptions submitOpts)
-			throws Exception {
-		final TestStreamEnvironment env = (TestStreamEnvironment) StreamExecutionEnvironment.getExecutionEnvironment();
-		env.start(topology.getStreamGraph().getJobGraph(topologyName));
-	}
-
-	@Override
-	public void killTopology(final String topologyName) {
-	}
-
-	@Override
-	public void killTopologyWithOpts(final String name, final KillOptions options) {
-	}
-
-	@Override
-	public void activate(final String topologyName) {
-	}
-
-	@Override
-	public void deactivate(final String topologyName) {
-	}
-
-	@Override
-	public void rebalance(final String name, final RebalanceOptions options) {
-	}
-
-	@Override
-	public void shutdown() {
-		final TestStreamEnvironment env = (TestStreamEnvironment) StreamExecutionEnvironment.getExecutionEnvironment();
-		try {
-			env.shutdown();
-		} catch (final InterruptedException e) {
-			e.printStackTrace();
-		}
-	}
-
-	@Override
-	public String getTopologyConf(final String id) {
-		return null;
-	}
-
-	@Override
-	public StormTopology getTopology(final String id) {
-		return null;
-	}
-
-	@Override
-	public ClusterSummary getClusterInfo() {
-		return null;
-	}
-
-	@Override
-	public TopologyInfo getTopologyInfo(final String id) {
-		return null;
-	}
-
-	@Override
-	public Map<?, ?> getState() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java
new file mode 100644
index 0000000..dd6d0d9
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/StormTestBase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class for Storm tests.
+ */
+public abstract class StormTestBase extends AbstractTestBase {
+	
+	public static final int DEFAULT_PARALLELISM = 4;
+	
+	public StormTestBase() {
+		this(new Configuration());
+	}
+	
+	public StormTestBase(Configuration config) {
+		super(config, StreamingMode.STREAMING);
+		setTaskManagerNumSlots(DEFAULT_PARALLELISM);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Methods to create the test program and for pre- and post- test work
+	// ------------------------------------------------------------------------
+
+	protected abstract void testProgram() throws Exception;
+
+	protected void preSubmit() throws Exception {}
+
+	protected void postSubmit() throws Exception {}
+
+	// ------------------------------------------------------------------------
+	//  Test entry point
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testJob() throws Exception {
+		try {
+			// pre-submit
+			try {
+				preSubmit();
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				fail("Pre-submit work caused an error: " + e.getMessage());
+			}
+
+			// prepare the test environment
+			startCluster();
+
+			// we need to initialize the stream test environment, and the storm local cluster
+			TestStreamEnvironment.setAsContext(this.executor, DEFAULT_PARALLELISM);
+			
+			FlinkLocalCluster.initialize(new FlinkLocalCluster.LocalClusterFactory() {
+				@Override
+				public FlinkLocalCluster createLocalCluster() {
+					return new FlinkLocalCluster(executor);
+				}
+			});
+
+			// call the test program
+			try {
+				testProgram();
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				fail("Error while calling the test program: " + e.getMessage());
+			}
+
+			// post-submit
+			try {
+				postSubmit();
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				fail("Post-submit work caused an error: " + e.getMessage());
+			}
+		}
+		finally {
+			// reset the FlinkLocalCluster to its default behavior
+			FlinkLocalCluster.initialize(new FlinkLocalCluster.DefaultLocalClusterFactory());
+			
+			// reset the StreamExecutionEnvironment to its default behavior
+			TestStreamEnvironment.unsetAsContext();
+			
+			// clean up all resources
+			stopCluster();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
index 930f87b..f47a58f 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.stormcompatibility.exclamation;
 
+import org.apache.flink.stormcompatibility.api.StormTestBase;
 import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormBolt;
 import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class ExclamationWithStormBoltITCase extends StreamingProgramTestBase {
+public class ExclamationWithStormBoltITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
index 4c515ce..2a8ac24 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.stormcompatibility.exclamation;
 
+import org.apache.flink.stormcompatibility.api.StormTestBase;
 import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormSpout;
 import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class ExclamationWithStormSpoutITCase extends StreamingProgramTestBase {
+public class ExclamationWithStormSpoutITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;
@@ -43,5 +43,4 @@ public class ExclamationWithStormSpoutITCase extends StreamingProgramTestBase {
 	protected void testProgram() throws Exception {
 		ExclamationWithStormSpout.main(new String[]{this.textPath, this.resultPath});
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
index d6bcf30..6cba39a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
@@ -18,21 +18,18 @@
 
 package org.apache.flink.stormcompatibility.exclamation;
 
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
 import org.apache.flink.stormcompatibility.excamation.StormExclamationLocal;
 import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class StormExclamationLocalITCase extends StreamingProgramTestBase {
+public class StormExclamationLocalITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;
 
 	@Override
 	protected void preSubmit() throws Exception {
-		FlinkLocalCluster.initialize(new FlinkTestCluster());
 		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
 		this.resultPath = this.getTempDirPath("result");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
index 9228474..c9516ff 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.stormcompatibility.wordcount;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase {
+public class BoltTokenizerWordCountITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
index dc75c25..351014e 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.stormcompatibility.wordcount;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class BoltTokenizerWordCountPojoITCase extends StreamingProgramTestBase {
+public class BoltTokenizerWordCountPojoITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
index e147f53..c2ed088 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.stormcompatibility.wordcount;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class BoltTokenizerWordCountWithNamesITCase extends StreamingProgramTestBase {
+public class BoltTokenizerWordCountWithNamesITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
index 9d7b869..93361c5 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.stormcompatibility.wordcount;
 
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class SpoutSourceWordCountITCase extends StreamingProgramTestBase {
+public class SpoutSourceWordCountITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
index 2427818..6b51cbd 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
@@ -18,19 +18,16 @@
 
 package org.apache.flink.stormcompatibility.wordcount;
 
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class StormWordCountLocalITCase extends StreamingProgramTestBase {
+public class StormWordCountLocalITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;
 
 	@Override
 	protected void preSubmit() throws Exception {
-		FlinkLocalCluster.initialize(new FlinkTestCluster());
 		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
 		this.resultPath = this.getTempDirPath("result");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
index 8b9a729..a9e9884 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
@@ -18,19 +18,16 @@
 
 package org.apache.flink.stormcompatibility.wordcount;
 
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.stormcompatibility.api.StormTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class StormWordCountLocalNamedITCase extends StreamingProgramTestBase {
+public class StormWordCountLocalNamedITCase extends StormTestBase {
 
 	protected String textPath;
 	protected String resultPath;
 
 	@Override
 	protected void preSubmit() throws Exception {
-		FlinkLocalCluster.initialize(new FlinkTestCluster());
 		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
 		this.resultPath = this.getTempDirPath("result");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 084e608..c69294d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -75,7 +75,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 
 /**
- * The ExecutionEnviroment is the context in which a program is executed. A
+ * The ExecutionEnvironment is the context in which a program is executed. A
  * {@link LocalEnvironment} will cause execution in the current JVM, a
  * {@link RemoteEnvironment} will cause execution on a remote setup.
  * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index b3cff51..77e977f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -112,6 +112,8 @@ abstract class FlinkMiniCluster(
   var taskManagerActors: Option[Seq[ActorRef]] = None
 
   protected var leaderRetrievalService: Option[LeaderRetrievalService] = None
+  
+  private var isRunning = false
 
   // --------------------------------------------------------------------------
   //                           Abstract Methods
@@ -271,6 +273,8 @@ abstract class FlinkMiniCluster(
     if(waitForTaskManagerRegistration) {
       waitForTaskManagersToBeRegistered()
     }
+
+    isRunning = true
   }
 
   def startWebServer(
@@ -314,6 +318,7 @@ abstract class FlinkMiniCluster(
     awaitTermination()
 
     leaderRetrievalService.foreach(_.stop())
+    isRunning = false
   }
 
   protected def shutdown(): Unit = {
@@ -354,6 +359,8 @@ abstract class FlinkMiniCluster(
       _ foreach(_.awaitTermination())
     }
   }
+  
+  def running = isRunning
 
   // --------------------------------------------------------------------------
   //                          Utility Methods

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
index 633d022..5ec0add 100644
--- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -99,14 +99,21 @@ public class HDFSTest {
 		try {
 			FileSystem fs = file.getFileSystem();
 			Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
-			new DopOneTestEnvironment();
+			
+			DopOneTestEnvironment.setAsContext();
 			try {
 				WordCount.main(new String[]{file.toString(), result.toString()});
-			} catch(Throwable t) {
+			}
+			catch(Throwable t) {
 				t.printStackTrace();
 				Assert.fail("Test failed with " + t.getMessage());
 			}
+			finally {
+				DopOneTestEnvironment.unsetAsContext();
+			}
+			
 			Assert.assertTrue("No result file present", hdfs.exists(result));
+			
 			// validate output:
 			org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
 			StringWriter writer = new StringWriter();
@@ -159,16 +166,23 @@ public class HDFSTest {
 	}
 
 	// package visible
-	static final class DopOneTestEnvironment extends LocalEnvironment {
-		static {
+	static abstract class DopOneTestEnvironment extends ExecutionEnvironment {
+		
+		public static void setAsContext() {
+			final LocalEnvironment le = new LocalEnvironment();
+			le.setParallelism(1);
+
 			initializeContextEnvironment(new ExecutionEnvironmentFactory() {
+
 				@Override
 				public ExecutionEnvironment createExecutionEnvironment() {
-					LocalEnvironment le = new LocalEnvironment();
-					le.setParallelism(1);
 					return le;
 				}
 			});
 		}
+		
+		public static void unsetAsContext() {
+			resetContextEnvironment();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index 859c686..cb37fd4 100644
--- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -85,7 +85,11 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 		return executor.executePlan(p);
 	}
 
-	public void setAsContext() {
+	public static void disableAllContextAndOtherEnvironments() {
+		
+		// we create a context environment that prevents the instantiation of further
+		// context environments. at the same time, setting the context environment prevents manual
+		// creation of local and remote environments
 		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
 			@Override
 			public ExecutionEnvironment createExecutionEnvironment() {
@@ -95,4 +99,8 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 		};
 		initializeContextEnvironment(factory);
 	}
+	
+	public static void resetContextEnvironments() {
+		ExecutionEnvironment.resetContextEnvironment();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
index 1e96ba3..cd8a846 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
@@ -53,8 +53,15 @@ class FlinkILoop(
   }
   // remote environment
   private val remoteEnv: ScalaShellRemoteEnvironment = {
+    // allow creation of environments
+    ScalaShellRemoteEnvironment.resetContextEnvironments()
+    
+    // create our environment that submits against the cluster (local or remote)
     val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
-    remoteEnv.setAsContext()
+    
+    // prevent further instantiation of environments
+    ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
+    
     remoteEnv
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 4c002d1..f0bd174 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -17,30 +17,57 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.ClusterUtil;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally,
+ * multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded
+ * Flink cluster in the background and executes the program on that cluster.
+ *
+ * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
+ * parallelism can be set via {@link #setParallelism(int)}.
+ *
+ * <p>Local environments can also be instantiated through {@link StreamExecutionEnvironment#createLocalEnvironment()}
+ * and {@link StreamExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a
+ * default parallelism equal to the number of hardware contexts in the local machine.
+ */
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
+	private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
+	
+	/** The configuration to use for the local cluster */
 	private final Configuration conf;
 
+	/**
+	 * Creates a new local stream environment that uses the default configuration.
+	 */
 	public LocalStreamEnvironment() {
-		conf = null;
-	}
-	public LocalStreamEnvironment(Configuration conf) {
-		this.conf = conf;
+		this(null);
 	}
 
 	/**
-	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
-	 * default name.
+	 * Creates a new local stream environment that configures its local executor with the given configuration.
 	 *
-	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 * @param config The configuration used to configure the local executor.
 	 */
-	@Override
-	public JobExecutionResult execute() throws Exception {
-		return execute(DEFAULT_JOB_NAME);
+	public LocalStreamEnvironment(Configuration config) {
+		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+			throw new InvalidProgramException(
+					"The LocalStreamEnvironment cannot be used when submitting a program through a client, " +
+							"or running in a TestEnvironment context.");
+		}
+		
+		this.conf = config == null ? new Configuration() : config;
 	}
 
 	/**
@@ -53,9 +80,30 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		JobExecutionResult result = ClusterUtil.runOnMiniCluster(getStreamGraph().getJobGraph(),
-				getParallelism(), -1, getConfig().isSysoutLoggingEnabled(), false, this.conf);
-		transformations.clear();
-		return result;
+		// transform the streaming program into a JobGraph
+		JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
+		
+		Configuration configuration = new Configuration();
+		configuration.addAll(jobGraph.getJobConfiguration());
+
+		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getParallelism());
+		
+		// add (and override) the settings with what the user defined
+		configuration.addAll(this.conf);
+		
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Running job on local embedded Flink mini cluster");
+		}
+
+		LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true, StreamingMode.STREAMING);
+		try {
+			exec.start();
+			return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
+		}
+		finally {
+			transformations.clear();
+			exec.stop();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 29439f6..ccf51d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -41,6 +43,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	private final String host;
 	private final int port;
 	private final List<File> jarFiles;
+	
+	/** The configuration used to parametrize the client that connects to the remote cluster */
+	private final Configuration config;
 
 	/**
 	 * Creates a new RemoteStreamEnvironment that points to the master
@@ -59,17 +64,46 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 *            provided in the JAR files.
 	 */
 	public RemoteStreamEnvironment(String host, int port, String... jarFiles) {
+		this(host, port, null, jarFiles);
+	}
+
+	/**
+	 * Creates a new RemoteStreamEnvironment that points to the master
+	 * (JobManager) described by the given host name and port.
+	 *
+	 * @param host
+	 *            The host name or address of the master (JobManager), where the
+	 *            program should be executed.
+	 * @param port
+	 *            The port of the master (JobManager), where the program should
+	 *            be executed.
+	 * @param config
+	 *            The configuration used to parametrize the client that connects to the
+	 *            remote cluster.
+	 * @param jarFiles
+	 *            The JAR files with code that needs to be shipped to the
+	 *            cluster. If the program uses user-defined functions,
+	 *            user-defined input formats, or any libraries, those must be
+	 *            provided in the JAR files.
+	 */
+	public RemoteStreamEnvironment(String host, int port, Configuration config, String... jarFiles) {
+		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
+			throw new InvalidProgramException(
+					"The RemoteEnvironment cannot be used when submitting a program through a client, " +
+							"or running in a TestEnvironment context.");
+		}
+		
 		if (host == null) {
 			throw new NullPointerException("Host must not be null.");
 		}
-
 		if (port < 1 || port >= 0xffff) {
 			throw new IllegalArgumentException("Port out of range");
 		}
 
 		this.host = host;
 		this.port = port;
-		this.jarFiles = new ArrayList<File>();
+		this.config = config == null ? new Configuration() : config;
+		this.jarFiles = new ArrayList<File>(jarFiles.length);
 		for (String jarFile : jarFiles) {
 			File file = new File(jarFile);
 			try {
@@ -83,13 +117,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	}
 
 	@Override
-	public JobExecutionResult execute() throws ProgramInvocationException {
-		JobGraph jobGraph = getStreamGraph().getJobGraph();
-		transformations.clear();
-		return executeRemotely(jobGraph);
-	}
-
-	@Override
 	public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
 		JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
 		transformations.clear();
@@ -112,9 +139,12 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 			jobGraph.addJar(new Path(file.getAbsolutePath()));
 		}
 
-		Configuration configuration = jobGraph.getJobConfiguration();
 		ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, getClass().getClassLoader());
-
+		
+		Configuration configuration = new Configuration();
+		configuration.addAll(jobGraph.getJobConfiguration());
+		configuration.addAll(this.config);
+		
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2a31390..5537fd4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -80,26 +80,46 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * {@link org.apache.flink.api.java.ExecutionEnvironment} for streaming jobs. An instance of it is
+ * An ExecutionEnvironment for streaming jobs. An instance of it is
  * necessary to construct streaming topologies.
  */
+/**
+ * The StreamExecutionEnvironment is the context in which a streaming program is executed. A
+ * {@link LocalStreamEnvironment} will cause execution in the current JVM, a
+ * {@link RemoteStreamEnvironment} will cause execution on a remote setup.
+ * 
+ * <p>The environment provides methods to control the job execution (such as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
+ *
+ * @see org.apache.flink.streaming.api.environment.LocalStreamEnvironment
+ * @see org.apache.flink.streaming.api.environment.RemoteStreamEnvironment
+ */
 public abstract class StreamExecutionEnvironment {
 
+	/** The default name to use for a streaming job if no other name has been specified */
 	public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
-
-	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
 	
 	/** The time characteristic that is used if none other is set */
-	private static TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
+	private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
+
+	/** The default buffer timeout (max delay of records in the network stack) */
+	private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
+
+	/** The environment of the context (local by default, cluster if invoked through command line) */
+	private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
+
+	/** The default parallelism used when creating a local environment */
+	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
 	
 	// ------------------------------------------------------------------------
-	
-	private long bufferTimeout = 100;
 
+	/** The execution configuration for this environment */
 	private final ExecutionConfig config = new ExecutionConfig();
-
+	
 	protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
-
+	
+	private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
+	
 	protected boolean isChainingEnabled = true;
 
 	protected long checkpointInterval = -1; // disabled
@@ -113,9 +133,7 @@ public abstract class StreamExecutionEnvironment {
 	/** The time characteristic used by the data streams */
 	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
 
-	/** The environment of the context (local by default, cluster if invoked through command line) */
-	private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
-
+	
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
 	// --------------------------------------------------------------------------------------------
@@ -1157,8 +1175,90 @@ public abstract class StreamExecutionEnvironment {
 		return new DataStreamSource<OUT>(this, typeInfo, sourceOperator, isParallel, sourceName);
 	}
 
+	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program that have resulted in a "sink" operation. Sink operations are
+	 * for example printing results or forwarding them to a message queue.
+	 * <p/>
+	 * The program execution will be logged and displayed with a generated
+	 * default name.
+	 *
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 * @throws Exception which occurs during job execution.
+	 */
+	public JobExecutionResult execute() throws Exception {
+		return execute(DEFAULT_JOB_NAME);
+	}
+
+	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program that have resulted in a "sink" operation. Sink operations are
+	 * for example printing results or forwarding them to a message queue.
+	 * <p/>
+	 * The program execution will be logged and displayed with the provided name
+	 *
+	 * @param jobName
+	 * 		Desired name of the job
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 * @throws Exception which occurs during job execution.
+	 */
+	public abstract JobExecutionResult execute(String jobName) throws Exception;
+
+	/**
+	 * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
+	 *
+	 * @return The streamgraph representing the transformations
+	 */
+	public StreamGraph getStreamGraph() {
+		if (transformations.size() <= 0) {
+			throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
+		}
+		return StreamGraphGenerator.generate(this, transformations);
+	}
+
+	/**
+	 * Creates the plan with which the system will execute the program, and
+	 * returns it as a String using a JSON representation of the execution data
+	 * flow graph. Note that this needs to be called, before the plan is
+	 * executed.
+	 *
+	 * @return The execution plan of the program, as a JSON String.
+	 */
+	public String getExecutionPlan() {
+		return getStreamGraph().getStreamingPlanAsJSON();
+	}
+
+	/**
+	 * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+	 * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+	 */
+	public <F> F clean(F f) {
+		if (getConfig().isClosureCleanerEnabled()) {
+			ClosureCleaner.clean(f, true);
+		}
+		ClosureCleaner.ensureSerializable(f);
+		return f;
+	}
+
+	/**
+	 * Adds an operator to the list of operators that should be executed when calling
+	 * {@link #execute}.
+	 *
+	 * <p>
+	 * When calling {@link #execute()} only the operators that where previously added to the list
+	 * are executed.
+	 *
+	 * <p>
+	 * This is not meant to be used by users. The API methods that create operators must call
+	 * this method.
+	 */
+	public void addOperator(StreamTransformation<?> transformation) {
+		Preconditions.checkNotNull(transformation, "transformation must not be null.");
+		this.transformations.add(transformation);
+	}
+
 	// --------------------------------------------------------------------------------------------
-	// Instantiation of Execution Contexts
+	//  Factory methods for ExecutionEnvironments
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -1175,6 +1275,10 @@ public abstract class StreamExecutionEnvironment {
 			return contextEnvironmentFactory.createExecutionEnvironment();
 		}
 
+		// because the streaming project depends on "flink-clients" (and not the other way around)
+		// we currently need to intercept the data set environment and create a dependent stream env.
+		// this should be fixed once we rework the project dependencies
+		
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		if (env instanceof ContextEnvironment) {
 			ContextEnvironment ctx = (ContextEnvironment) env;
@@ -1187,8 +1291,9 @@ public abstract class StreamExecutionEnvironment {
 		}
 	}
 
-	private static StreamExecutionEnvironment createContextEnvironment(Client client,
-			List<File> jars, int parallelism, boolean wait) {
+	private static StreamExecutionEnvironment createContextEnvironment(
+			Client client, List<File> jars, int parallelism, boolean wait)
+	{
 		return new StreamContextEnvironment(client, jars, parallelism, wait);
 	}
 
@@ -1236,11 +1341,9 @@ public abstract class StreamExecutionEnvironment {
 	public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
 		LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration);
 		currentEnvironment.setParallelism(parallelism);
-		return (LocalStreamEnvironment) currentEnvironment;
+		return currentEnvironment;
 	}
 
-	// TODO:fix cluster default parallelism
-
 	/**
 	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
 	 * (parts of) the program to a cluster for execution. Note that all file
@@ -1261,8 +1364,8 @@ public abstract class StreamExecutionEnvironment {
 	 * 		provided in the JAR files.
 	 * @return A remote environment that executes the program on a cluster.
 	 */
-	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
-			String... jarFiles) {
+	public static StreamExecutionEnvironment createRemoteEnvironment(
+			String host, int port, String... jarFiles) {
 		return new RemoteStreamEnvironment(host, port, jarFiles);
 	}
 
@@ -1287,93 +1390,41 @@ public abstract class StreamExecutionEnvironment {
 	 * 		provided in the JAR files.
 	 * @return A remote environment that executes the program on a cluster.
 	 */
-	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
-			int parallelism, String... jarFiles) {
+	public static StreamExecutionEnvironment createRemoteEnvironment(
+			String host, int port, int parallelism, String... jarFiles)
+	{
 		RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles);
 		env.setParallelism(parallelism);
 		return env;
 	}
 
 	/**
-	 * Triggers the program execution. The environment will execute all parts of
-	 * the program that have resulted in a "sink" operation. Sink operations are
-	 * for example printing results or forwarding them to a message queue.
-	 * <p/>
-	 * The program execution will be logged and displayed with a generated
-	 * default name.
-	 *
-	 * @return The result of the job execution, containing elapsed time and accumulators.
-	 * @throws Exception which occurs during job execution.
-	 */
-	public abstract JobExecutionResult execute() throws Exception;
-
-	/**
-	 * Triggers the program execution. The environment will execute all parts of
-	 * the program that have resulted in a "sink" operation. Sink operations are
-	 * for example printing results or forwarding them to a message queue.
-	 * <p/>
-	 * The program execution will be logged and displayed with the provided name
-	 *
-	 * @param jobName
-	 * 		Desired name of the job
-	 * @return The result of the job execution, containing elapsed time and accumulators.
-	 * @throws Exception which occurs during job execution.
-	 */
-	public abstract JobExecutionResult execute(String jobName) throws Exception;
-
-	/**
-	 * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
-	 *
-	 * @return The streamgraph representing the transformations
-	 */
-	public StreamGraph getStreamGraph() {
-		if (transformations.size() <= 0) {
-			throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
-		}
-		return StreamGraphGenerator.generate(this, transformations);
-	}
-
-	/**
-	 * Creates the plan with which the system will execute the program, and
-	 * returns it as a String using a JSON representation of the execution data
-	 * flow graph. Note that this needs to be called, before the plan is
-	 * executed.
-	 *
-	 * @return The execution plan of the program, as a JSON String.
-	 */
-	public String getExecutionPlan() {
-		return getStreamGraph().getStreamingPlanAsJSON();
-	}
-
-	/**
-	 * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-	 * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
-	 */
-	public <F> F clean(F f) {
-		if (getConfig().isClosureCleanerEnabled()) {
-			ClosureCleaner.clean(f, true);
-		}
-		ClosureCleaner.ensureSerializable(f);
-		return f;
-	}
-
-	/**
-	 * Adds an operator to the list of operators that should be executed when calling
-	 * {@link #execute}.
-	 *
-	 * <p>
-	 * When calling {@link #execute()} only the operators that where previously added to the list
-	 * are executed.
+	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
+	 * (parts of) the program to a cluster for execution. Note that all file
+	 * paths used in the program must be accessible from the cluster. The
+	 * execution will use the specified parallelism.
 	 *
-	 * <p>
-	 * This is not meant to be used by users. The API methods that create operators must call
-	 * this method.
+	 * @param host
+	 * 		The host name or address of the master (JobManager), where the
+	 * 		program should be executed.
+	 * @param port
+	 * 		The port of the master (JobManager), where the program should
+	 * 		be executed.
+	 * @param clientConfig
+	 * 		The configuration used by the client that connects to the remote cluster.
+	 * @param jarFiles
+	 * 		The JAR files with code that needs to be shipped to the
+	 * 		cluster. If the program uses user-defined functions,
+	 * 		user-defined input formats, or any libraries, those must be
+	 * 		provided in the JAR files.
+	 * @return A remote environment that executes the program on a cluster.
 	 */
-	public void addOperator(StreamTransformation<?> transformation) {
-		Preconditions.checkNotNull(transformation, "Sinks must not be null.");
-		this.transformations.add(transformation);
+	public static StreamExecutionEnvironment createRemoteEnvironment(
+			String host, int port, Configuration clientConfig, String... jarFiles)
+	{
+		return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
 	}
-
+	
 	// --------------------------------------------------------------------------------------------
 	//  Methods to control the context and local environments for execution from packaged programs
 	// --------------------------------------------------------------------------------------------
@@ -1381,4 +1432,8 @@ public abstract class StreamExecutionEnvironment {
 	protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
 		contextEnvironmentFactory = ctx;
 	}
+	
+	protected static void resetContextEnvironment() {
+		contextEnvironmentFactory = null;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
deleted file mode 100644
index d4569fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility class to manage mini cluster for Apache Flink.
- */
-public final class ClusterUtil {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class);
-
-	/**
-	 * Executes the given JobGraph locally, on a FlinkMiniCluster
-	 * 
-	 * @param jobGraph
-	 *            jobGraph
-	 * @param parallelism
-	 *            numberOfTaskTrackers
-	 * @param memorySize
-	 *            memorySize
-	 * @param customConf
-	 * 		Custom configuration for the LocalExecutor. Can be null.
-	 * @return The result of the job execution, containing elapsed time and accumulators.
-	 */
-	public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize,
-													boolean printDuringExecution, boolean detached, Configuration customConf)
-			throws Exception {
-
-		Configuration configuration = jobGraph.getJobConfiguration();
-
-		LocalFlinkMiniCluster exec = null;
-
-		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
-		if(customConf != null) {
-			configuration.addAll(customConf);
-		}
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Running on mini cluster");
-		}
-
-		try {
-			exec = new LocalFlinkMiniCluster(configuration, true);
-			exec.start();
-			
-			if (detached) {
-				exec.submitJobDetached(jobGraph);
-				return null;
-			} else {
-				return exec.submitJobAndWait(jobGraph, printDuringExecution);
-			}
-		} finally {
-			if (exec != null && !detached) {
-				exec.stop();
-			}
-		}
-	}
-
-	/**
-	 * Start a job in a detached mode on a local mini cluster.
-	 */
-	public static void startOnMiniCluster(JobGraph jobGraph, int parallelism) throws Exception {
-		runOnMiniCluster(jobGraph, parallelism, -1, true, true, null);
-	}
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private ClusterUtil() {
-		throw new RuntimeException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index d251a5d..4e02f2c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -35,23 +35,22 @@ import org.junit.BeforeClass;
  * one or more regular test methods and retrieve the StreamExecutionEnvironment from
  * the context:
  *
- * <pre>{@code
- *
- *   @Test
+ * <pre>
+ *   {@literal @}Test
  *   public void someTest() {
  *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  *       // test code
  *       env.execute();
  *   }
  *
- *   @Test
+ *   {@literal @}Test
  *   public void anotherTest() {
  *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  *       // test code
  *       env.execute();
  *   }
  *
- * }</pre>
+ * </pre>
  */
 public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
 
@@ -61,32 +60,22 @@ public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
 
 	protected static final int DEFAULT_PARALLELISM = 4;
 
-	protected static ForkableFlinkMiniCluster cluster = null;
-	
-	// ------------------------------------------------------------------------
+	protected static ForkableFlinkMiniCluster cluster;
 	
-	public StreamingMultipleProgramsTestBase() {
-		TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, DEFAULT_PARALLELISM);
-		clusterEnv.setAsContext();
-	}
 
 	// ------------------------------------------------------------------------
 	//  Cluster setup & teardown
 	// ------------------------------------------------------------------------
 
 	@BeforeClass
-	public static void setup() throws Exception{
-		cluster = TestBaseUtils.startCluster(
-			1,
-			DEFAULT_PARALLELISM,
-			StreamingMode.STREAMING,
-			false,
-			false,
-			true);
+	public static void setup() throws Exception {
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false, false, true);
+		TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
 	}
 
 	@AfterClass
 	public static void teardown() throws Exception {
+		TestStreamEnvironment.unsetAsContext();
 		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index 92f8301..ce3aa86 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -18,33 +18,27 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.test.util.AbstractTestBase;
 
-import org.junit.Assert;
 import org.junit.Test;
 
-public abstract class StreamingProgramTestBase extends AbstractTestBase {
-
-	private static final int DEFAULT_PARALLELISM = 4;
-
-	private TestStreamEnvironment env;
+import static org.junit.Assert.fail;
 
-	private JobExecutionResult latestExecutionResult;
-
-	private int parallelism = DEFAULT_PARALLELISM;
+public abstract class StreamingProgramTestBase extends AbstractTestBase {
 
+	protected static final int DEFAULT_PARALLELISM = 4;
 
+	private int parallelism;
+	
+	
 	public StreamingProgramTestBase() {
-		this(new Configuration());
+		super(new Configuration(), StreamingMode.STREAMING);
+		setParallelism(DEFAULT_PARALLELISM);
 	}
 
-	public StreamingProgramTestBase(Configuration config) {
-		super(config);
-		setTaskManagerNumSlots(parallelism);
-	}
-	
+
 	public void setParallelism(int parallelism) {
 		this.parallelism = parallelism;
 		setTaskManagerNumSlots(parallelism);
@@ -54,10 +48,6 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
 		return parallelism;
 	}
 	
-	public JobExecutionResult getLatestExecutionResult() {
-		return this.latestExecutionResult;
-	}
-	
 
 	// --------------------------------------------------------------------------------------------
 	//  Methods to create the test program and for pre- and post- test work
@@ -74,8 +64,7 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
 	// --------------------------------------------------------------------------------------------
 
 	@Test
-	public void testJobWithoutObjectReuse() throws Exception {
-		startCluster();
+	public void testJob() throws Exception {
 		try {
 			// pre-submit
 			try {
@@ -84,25 +73,26 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
 			catch (Exception e) {
 				System.err.println(e.getMessage());
 				e.printStackTrace();
-				Assert.fail("Pre-submit work caused an error: " + e.getMessage());
+				fail("Pre-submit work caused an error: " + e.getMessage());
 			}
 
 			// prepare the test environment
-			env = new TestStreamEnvironment(this.executor, this.parallelism);
-			env.setAsContext();
+			startCluster();
+
+			TestStreamEnvironment.setAsContext(this.executor, getParallelism());
 
 			// call the test program
 			try {
 				testProgram();
-				this.latestExecutionResult = env.latestResult;
 			}
 			catch (Exception e) {
 				System.err.println(e.getMessage());
 				e.printStackTrace();
-				Assert.fail("Error while calling the test program: " + e.getMessage());
+				fail("Error while calling the test program: " + e.getMessage());
+			}
+			finally {
+				TestStreamEnvironment.unsetAsContext();
 			}
-
-			Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);
 
 			// post-submit
 			try {
@@ -111,13 +101,11 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
 			catch (Exception e) {
 				System.err.println(e.getMessage());
 				e.printStackTrace();
-				Assert.fail("Post-submit work caused an error: " + e.getMessage());
-			}
-		} finally {
-			if(env.clusterRunsSynchronous()) {
-				stopCluster();
+				fail("Post-submit work caused an error: " + e.getMessage());
 			}
 		}
+		finally {
+			stopCluster();
+		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 91082d8..8cd1e4a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -20,150 +20,56 @@ package org.apache.flink.streaming.util;
 
 import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
+/**
+ * A StreamExecutionEnvironment that executes its jobs on a test cluster.
+ */
 public class TestStreamEnvironment extends StreamExecutionEnvironment {
-	private static final String DEFAULT_JOBNAME = "TestStreamingJob";
-	private static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
-
-	private long memorySize;
-	protected JobExecutionResult latestResult;
+	
+	/** The mini cluster in which this environment executes its jobs */
 	private ForkableFlinkMiniCluster executor;
-	private boolean internalExecutor;
-
-	public TestStreamEnvironment(int parallelism, long memorySize){
-		setParallelism(parallelism);
-		this.memorySize = memorySize;
-		internalExecutor = true;
-	}
+	
 
-	public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism){
+	public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
 		this.executor = Preconditions.checkNotNull(executor);
-		setDefaultLocalParallelism(parallelism);
 		setParallelism(parallelism);
 	}
-
-	@Override
-	public JobExecutionResult execute() throws Exception {
-		return execute(DEFAULT_JOBNAME);
-	}
-
+	
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		JobExecutionResult result = execute(getStreamGraph().getJobGraph(jobName));
-		return result;
-	}
-	
-	public JobExecutionResult execute(JobGraph jobGraph) throws Exception {
-		if (internalExecutor) {
-			Configuration configuration = jobGraph.getJobConfiguration();
-
-			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-					getParallelism());
-			configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
-
-			executor = new ForkableFlinkMiniCluster(configuration);
-			executor.start();
-		}
-		try {
-			sync = true;
-			latestResult = executor.submitJobAndWait(jobGraph, false);
-			return latestResult;
-		} catch (JobExecutionException e) {
-			if (e.getMessage().contains("GraphConversionException")) {
-				throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
-			} else {
-				throw e;
-			}
-		} finally {
-			transformations.clear();
-			if (internalExecutor){
-				executor.shutdown();
-			}
-		}
-	}
-
-	private ForkableFlinkMiniCluster cluster = null;
-	private Thread jobRunner = null;
-	private boolean sync = true;
-
-	public void start(final JobGraph jobGraph) throws Exception {
-		if(cluster != null) {
-			throw new IllegalStateException("The cluster is already running");
-		}
-
-		if (internalExecutor) {
-			Configuration configuration = jobGraph.getJobConfiguration();
-
-			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-					getParallelism());
-			configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
-
-			cluster = new ForkableFlinkMiniCluster(configuration);
-		} else {
-			cluster = executor;
-		}
-		try {
-			sync = false;
-
-			jobRunner = new Thread() {
-				public void run() {
-					try {
-						latestResult = cluster.submitJobAndWait(jobGraph, false);
-					} catch (JobExecutionException e) {
-						// TODO remove: hack to make ITCase succeed because .submitJobAndWait() throws exception on .stop() (see this.shutdown())
-						latestResult = new JobExecutionResult(null, 0, null);
-						e.printStackTrace();
-						//throw new RuntimeException(e);
-					} catch (Exception e) {
-						new RuntimeException(e);
-					}
-				}
-			};
-			jobRunner.start();
-		} catch(RuntimeException e) {
-			if (e.getCause().getMessage().contains("GraphConversionException")) {
-				throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
-			} else {
-				throw e;
-			}
-		}
-	}
-
-	public JobExecutionResult shutdown() throws InterruptedException {
-		if(!sync) {
-			cluster.stop();
-			cluster = null;
-
-			jobRunner.join();
-			jobRunner = null;
-
-			return latestResult;
-		}
-
-		throw new IllegalStateException("Cluster was not started via .start(...)");
-	}
-
-	public boolean clusterRunsSynchronous() {
-		return sync;
+		final JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
+		return executor.submitJobAndWait(jobGraph, false);
 	}
 
-	protected void setAsContext() {
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
+	 * the given cluster with the given default parallelism.
+	 * 
+	 * @param cluster The test cluster to run the test program on.
+	 * @param parallelism The default parallelism for the test programs.
+	 */
+	public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) {
+		
 		StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
 			@Override
 			public StreamExecutionEnvironment createExecutionEnvironment() {
-				return TestStreamEnvironment.this;
+				return new TestStreamEnvironment(cluster, parallelism);
 			}
 		};
 
 		initializeContextEnvironment(factory);
 	}
 
+	/**
+	 * Resets the streaming context environment to null.
+	 */
+	public static void unsetAsContext() {
+		resetContextEnvironment();
+	} 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82d62361/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
index fb0ab0a..37812c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExamp
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
 
 public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
+	
 	protected String textPath;
 	protected String resultPath;
 
@@ -40,6 +41,5 @@ public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
 	@Override
 	protected void testProgram() throws Exception {
 		TopSpeedWindowing.main(new String[]{textPath, resultPath});
-
 	}
 }


Mime
View raw message