flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/4] flink git commit: Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator"
Date Tue, 18 Apr 2017 16:09:32 GMT
Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator"

This reverts commit 9cfae899358e0694c3ecedae1fad20e428a1d359.

The fixes around FLINK-5808 introduced follow-up issues.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0182141d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0182141d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0182141d

Branch: refs/heads/master
Commit: 0182141d41be52ae0cf4a3563d4b8c6f3daca02e
Parents: a13750c
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Apr 4 14:04:40 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Apr 18 17:42:10 2017 +0200

----------------------------------------------------------------------
 .../Flip6LocalStreamEnvironment.java            |   4 -
 .../api/environment/LocalStreamEnvironment.java |  26 +-
 .../environment/RemoteStreamEnvironment.java    |   5 -
 .../environment/StreamContextEnvironment.java   |  13 +-
 .../environment/StreamExecutionEnvironment.java |  65 ++--
 .../api/environment/StreamPlanEnvironment.java  |  15 +-
 .../flink/streaming/api/graph/StreamGraph.java  |   6 +-
 .../api/graph/StreamGraphGenerator.java         |  12 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  14 +-
 .../api/StreamExecutionEnvironmentTest.java     | 289 +++++++++++++++++
 .../StreamExecutionEnvironmentTest.java         | 317 -------------------
 .../graph/StreamingJobGraphGeneratorTest.java   |  10 +-
 .../FoldApplyProcessWindowFunctionTest.java     |   8 +-
 .../operators/FoldApplyWindowFunctionTest.java  |   6 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  28 +-
 .../streaming/api/scala/DataStreamTest.scala    |  11 +-
 .../streaming/util/TestStreamEnvironment.java   |   1 -
 .../accumulators/AccumulatorLiveITCase.java     |   4 -
 18 files changed, 397 insertions(+), 437 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 244660a..61ca55f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -46,9 +46,6 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);
 
-	/** The default parallelism used when creating a local environment */
-	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
-
 	/** The configuration to use for the mini cluster */
 	private final Configuration conf;
 
@@ -65,7 +62,6 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 * @param config The configuration used to configure the local executor.
 	 */
 	public Flip6LocalStreamEnvironment(Configuration config) {
-		super(defaultLocalParallelism);
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 117f6d8..566beba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -46,9 +46,6 @@ import org.slf4j.LoggerFactory;
 @Public
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
-	/** The default parallelism used when creating a local environment */
-	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
-
 	private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
 	
 	/** The configuration to use for the local cluster */
@@ -58,43 +55,24 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 * Creates a new local stream environment that uses the default configuration.
 	 */
 	public LocalStreamEnvironment() {
-		this(defaultLocalParallelism);
+		this(null);
 	}
 
 	/**
-	 * Creates a new local stream environment that uses the default configuration.
-	 */
-	public LocalStreamEnvironment(int parallelism) {
-		this(null, parallelism);
-	}
-
-
-	/**
 	 * Creates a new local stream environment that configures its local executor with the given configuration.
 	 *
 	 * @param config The configuration used to configure the local executor.
 	 */
 	public LocalStreamEnvironment(Configuration config) {
-		this(config, defaultLocalParallelism);
-	}
-
-	/**
-	 * Creates a new local stream environment that configures its local executor with the given configuration.
-	 *
-	 * @param config The configuration used to configure the local executor.
-	 */
-	public LocalStreamEnvironment(Configuration config, int parallelism) {
-		super(parallelism);
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The LocalStreamEnvironment cannot be used when submitting a program through a client, " +
 							"or running in a TestEnvironment context.");
 		}
-
+		
 		this.conf = config == null ? new Configuration() : config;
 	}
 
-
 	/**
 	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
 	 * specified name.

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 5684e28..333f9c0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -37,7 +37,6 @@ import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,10 +129,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 *            The protocol must be supported by the {@link java.net.URLClassLoader}.
 	 */
 	public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
-		super(GlobalConfiguration.loadConfiguration().getInteger(
-				ConfigConstants.DEFAULT_PARALLELISM_KEY,
-				ConfigConstants.DEFAULT_PARALLELISM));
-		
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The RemoteEnvironment cannot be used when submitting a program through a client, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 51078f2..49c5347 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -38,13 +38,14 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 	private final ContextEnvironment ctx;
 
 	protected StreamContextEnvironment(ContextEnvironment ctx) {
-		// if the batch ContextEnvironment has a parallelism this must have come from
-		// the CLI Client. We should set that as our default parallelism
-		super(ctx.getParallelism() > 0 ? ctx.getParallelism() :
-				GlobalConfiguration.loadConfiguration().getInteger(
-						ConfigConstants.DEFAULT_PARALLELISM_KEY,
-						ConfigConstants.DEFAULT_PARALLELISM));
 		this.ctx = ctx;
+		if (ctx.getParallelism() > 0) {
+			setParallelism(ctx.getParallelism());
+		} else {
+			setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
+					ConfigConstants.DEFAULT_PARALLELISM_KEY,
+					ConfigConstants.DEFAULT_PARALLELISM));
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b6540dc..e836616 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -112,6 +112,9 @@ public abstract class StreamExecutionEnvironment {
 	/** The environment of the context (local by default, cluster if invoked through command line) */
 	private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
 
+	/** The default parallelism used when creating a local environment */
+	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
+
 	// ------------------------------------------------------------------------
 
 	/** The execution configuration for this environment */
@@ -132,23 +135,11 @@ public abstract class StreamExecutionEnvironment {
 	/** The time characteristic used by the data streams */
 	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
 
-	/** The parallelism to use when no parallelism is set on an operation. */
-	private final int defaultParallelism;
-
 
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
 	// --------------------------------------------------------------------------------------------
 
-
-	public StreamExecutionEnvironment() {
-		this(ConfigConstants.DEFAULT_PARALLELISM);
-	}
-
-	public StreamExecutionEnvironment(int defaultParallelism) {
-		this.defaultParallelism = defaultParallelism;
-	}
-
 	/**
 	 * Gets the config object.
 	 */
@@ -1527,7 +1518,7 @@ public abstract class StreamExecutionEnvironment {
 		if (transformations.size() <= 0) {
 			throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
 		}
-		return StreamGraphGenerator.generate(this, transformations, defaultParallelism);
+		return StreamGraphGenerator.generate(this, transformations);
 	}
 
 	/**
@@ -1615,7 +1606,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A local execution environment.
 	 */
 	public static LocalStreamEnvironment createLocalEnvironment() {
-		return new LocalStreamEnvironment();
+		return createLocalEnvironment(defaultLocalParallelism);
 	}
 
 	/**
@@ -1624,12 +1615,14 @@ public abstract class StreamExecutionEnvironment {
 	 * environment was created in. It will use the parallelism specified in the
 	 * parameter.
 	 *
-	 * @param defaultParallelism The default parallelism for the local environment.
-	 * 
+	 * @param parallelism
+	 * 		The parallelism for the local environment.
 	 * @return A local execution environment with the specified parallelism.
 	 */
-	public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism) {
-		return new LocalStreamEnvironment(defaultParallelism);
+	public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
+		LocalStreamEnvironment env = new LocalStreamEnvironment();
+		env.setParallelism(parallelism);
+		return env;
 	}
 
 	/**
@@ -1638,13 +1631,16 @@ public abstract class StreamExecutionEnvironment {
 	 * environment was created in. It will use the parallelism specified in the
 	 * parameter.
 	 *
-	 * @param defaultParallelism The parallelism for the local environment.
-	 * @param configuration Pass a custom configuration into the cluster
-	 *
+	 * @param parallelism
+	 * 		The parallelism for the local environment.
+	 * 	@param configuration
+	 * 		Pass a custom configuration into the cluster
 	 * @return A local execution environment with the specified parallelism.
 	 */
-	public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism, Configuration configuration) {
-		return new LocalStreamEnvironment(configuration, defaultParallelism);
+	public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
+		LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration);
+		currentEnvironment.setParallelism(parallelism);
+		return currentEnvironment;
 	}
 
 	/**
@@ -1669,6 +1665,7 @@ public abstract class StreamExecutionEnvironment {
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
 		LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
+		localEnv.setParallelism(defaultLocalParallelism);
 
 		return localEnv;
 	}
@@ -1754,6 +1751,28 @@ public abstract class StreamExecutionEnvironment {
 		return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
 	}
 
+	/**
+	 * Gets the default parallelism that will be used for the local execution environment created by
+	 * {@link #createLocalEnvironment()}.
+	 *
+	 * @return The default local parallelism
+	 */
+	@PublicEvolving
+	public static int getDefaultLocalParallelism() {
+		return defaultLocalParallelism;
+	}
+
+	/**
+	 * Sets the default parallelism that will be used for the local execution
+	 * environment created by {@link #createLocalEnvironment()}.
+	 *
+	 * @param parallelism The parallelism to use as the default local parallelism.
+	 */
+	@PublicEvolving
+	public static void setDefaultLocalParallelism(int parallelism) {
+		defaultLocalParallelism = parallelism;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Methods to control the context and local environments for execution from packaged programs
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 9c676c4..b1521f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -32,11 +32,18 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 	private ExecutionEnvironment env;
 
 	protected StreamPlanEnvironment(ExecutionEnvironment env) {
-		super(GlobalConfiguration.loadConfiguration().getInteger(
-				ConfigConstants.DEFAULT_PARALLELISM_KEY,
-				ConfigConstants.DEFAULT_PARALLELISM));
-
+		super();
 		this.env = env;
+
+		int parallelism = env.getParallelism();
+		if (parallelism > 0) {
+			setParallelism(parallelism);
+		} else {
+			// determine parallelism
+			setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
+					ConfigConstants.DEFAULT_PARALLELISM_KEY,
+					ConfigConstants.DEFAULT_PARALLELISM));
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index a99efb1..cc3ca9e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -94,14 +94,12 @@ public class StreamGraph extends StreamingPlan {
 	private AbstractStateBackend stateBackend;
 	private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
-	private final int defaultParallelism;
 
-	public StreamGraph(StreamExecutionEnvironment environment, int defaultParallelism) {
+	public StreamGraph(StreamExecutionEnvironment environment) {
 		this.environment = environment;
 		this.executionConfig = environment.getConfig();
 		this.checkpointConfig = environment.getCheckpointConfig();
 
-		this.defaultParallelism = defaultParallelism;
 		// create an empty new stream graph.
 		clear();
 	}
@@ -657,7 +655,7 @@ public class StreamGraph extends StreamingPlan {
 							+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
 		}
 
-		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this, defaultParallelism);
+		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
 
 		return jobgraphGenerator.createJobGraph();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 9fc8dd8..f9eec4f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -98,11 +98,12 @@ public class StreamGraphGenerator {
 	// we have loops, i.e. feedback edges.
 	private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed;
 
+
 	/**
 	 * Private constructor. The generator should only be invoked using {@link #generate}.
 	 */
-	private StreamGraphGenerator(StreamExecutionEnvironment env, int defaultParallelism) {
-		this.streamGraph = new StreamGraph(env, defaultParallelism);
+	private StreamGraphGenerator(StreamExecutionEnvironment env) {
+		this.streamGraph = new StreamGraph(env);
 		this.streamGraph.setChaining(env.isChainingEnabled());
 		this.streamGraph.setStateBackend(env.getStateBackend());
 		this.env = env;
@@ -119,11 +120,8 @@ public class StreamGraphGenerator {
 	 *
 	 * @return The generated {@code StreamGraph}
 	 */
-	public static StreamGraph generate(
-			StreamExecutionEnvironment env,
-			List<StreamTransformation<?>> transformations,
-			int defaultParallelism) {
-		return new StreamGraphGenerator(env, defaultParallelism).generateInternal(transformations);
+	public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
+		return new StreamGraphGenerator(env).generateInternal(transformations);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0896eb7..794de5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -93,13 +92,10 @@ public class StreamingJobGraphGenerator {
 	private final StreamGraphHasher defaultStreamGraphHasher;
 	private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
-	private final int defaultParallelism;
-
-	public StreamingJobGraphGenerator(StreamGraph streamGraph, int defaultParallelism) {
+	public StreamingJobGraphGenerator(StreamGraph streamGraph) {
 		this.streamGraph = streamGraph;
 		this.defaultStreamGraphHasher = new StreamGraphHasherV2();
 		this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
-		this.defaultParallelism = defaultParallelism;
 	}
 
 	private void init() {
@@ -342,12 +338,12 @@ public class StreamingJobGraphGenerator {
 
 		int parallelism = streamNode.getParallelism();
 
-		if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-			parallelism = defaultParallelism;
+		if (parallelism > 0) {
+			jobVertex.setParallelism(parallelism);
+		} else {
+			parallelism = jobVertex.getParallelism();
 		}
 
-		jobVertex.setParallelism(parallelism);
-
 		jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
 
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
new file mode 100644
index 0000000..3fc1344
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.SplittableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class StreamExecutionEnvironmentTest {
+
+	@Test
+	public void fromElementsWithBaseTypeTest1() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void fromElementsWithBaseTypeTest2() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testFromCollectionParallelism() {
+		try {
+			TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+			DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
+
+			try {
+				dataStream1.setParallelism(4);
+				fail("should throw an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+
+			dataStream1.addSink(new DiscardingSink<Integer>());
+	
+			DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
+					typeInfo).setParallelism(4);
+
+			dataStream2.addSink(new DiscardingSink<Integer>());
+
+			env.getExecutionPlan();
+
+			assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
+			assertEquals("Parallelism of parallel collection source must be 4.",
+					4, 
+					env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSources() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+			}
+
+			@Override
+			public void cancel() {
+			}
+		};
+		DataStreamSource<Integer> src1 = env.addSource(srcFun);
+		src1.addSink(new DiscardingSink<Integer>());
+		assertEquals(srcFun, getFunctionFromDataSource(src1));
+
+		List<Long> list = Arrays.asList(0L, 1L, 2L);
+
+		DataStreamSource<Long> src2 = env.generateSequence(0, 2);
+		assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource);
+
+		DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
+		assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction);
+
+		DataStreamSource<Long> src4 = env.fromCollection(list);
+		assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
+	}
+
+	@Test
+	public void testParallelismBounds() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+			}
+
+			@Override
+			public void cancel() {
+			}
+		};
+
+
+		SingleOutputStreamOperator<Object> operator =
+				env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void flatMap(Integer value, Collector<Object> out) throws Exception {
+
+			}
+		});
+
+		// default value for max parallelism
+		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
+
+		// bounds for parallelism 1
+		try {
+			operator.setParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for parallelism 2
+		operator.setParallelism(1);
+		Assert.assertEquals(1, operator.getParallelism());
+
+		// bounds for parallelism 3
+		operator.setParallelism(1 << 15);
+		Assert.assertEquals(1 << 15, operator.getParallelism());
+
+		// default value after generating
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
+
+		// configured value after generating
+		env.setMaxParallelism(42);
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
+
+		// bounds configured parallelism 1
+		try {
+			env.setMaxParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds configured parallelism 2
+		try {
+			env.setMaxParallelism(1 + (1 << 15));
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 1
+		try {
+			operator.setMaxParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 2
+		try {
+			operator.setMaxParallelism(1 + (1 << 15));
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 3
+		operator.setMaxParallelism(1);
+		Assert.assertEquals(1, operator.getTransformation().getMaxParallelism());
+
+		// bounds for max parallelism 4
+		operator.setMaxParallelism(1 << 15);
+		Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism());
+
+		// override config
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism());
+	}
+
+	/////////////////////////////////////////////////////////////
+	// Utilities
+	/////////////////////////////////////////////////////////////
+
+
+	private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) {
+		StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
+		StreamGraph streamGraph = env.getStreamGraph();
+		return streamGraph.getStreamNode(dataStream.getId()).getOperator();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
+		dataStreamSource.addSink(new DiscardingSink<T>());
+		AbstractUdfStreamOperator<?, ?> operator =
+				(AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
+		return (SourceFunction<T>) operator.getUserFunction();
+	}
+
+	public static class DummySplittableIterator<T> extends SplittableIterator<T> {
+		private static final long serialVersionUID = 1312752876092210499L;
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public Iterator<T>[] split(int numPartitions) {
+			return (Iterator<T>[]) new Iterator<?>[0];
+		}
+
+		@Override
+		public int getMaximumNumberOfSplits() {
+			return 0;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return false;
+		}
+
+		@Override
+		public T next() {
+			throw new NoSuchElementException();
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	public static class ParentClass {
+		int num;
+		String string;
+		public ParentClass(int num, String string) {
+			this.num = num;
+			this.string = string;
+		}
+	}
+
+	public static class SubClass extends ParentClass{
+		public SubClass(int num, String string) {
+			super(num, string);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
deleted file mode 100644
index d29c833..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.environment;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.SplittableIterator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-public class StreamExecutionEnvironmentTest {
-
-	@Test
-	public void fromElementsWithBaseTypeTest1() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void fromElementsWithBaseTypeTest2() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testFromCollectionParallelism() {
-		try {
-			TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-			DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-
-			try {
-				dataStream1.setParallelism(4);
-				fail("should throw an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			dataStream1.addSink(new DiscardingSink<Integer>());
-	
-			DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
-					typeInfo).setParallelism(4);
-
-			dataStream2.addSink(new DiscardingSink<Integer>());
-
-			env.getExecutionPlan();
-
-			assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
-			assertEquals("Parallelism of parallel collection source must be 4.",
-					4, 
-					env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSources() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-			}
-
-			@Override
-			public void cancel() {
-			}
-		};
-		DataStreamSource<Integer> src1 = env.addSource(srcFun);
-		src1.addSink(new DiscardingSink<Integer>());
-		assertEquals(srcFun, getFunctionFromDataSource(src1));
-
-		List<Long> list = Arrays.asList(0L, 1L, 2L);
-
-		DataStreamSource<Long> src2 = env.generateSequence(0, 2);
-		assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource);
-
-		DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
-		assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction);
-
-		DataStreamSource<Long> src4 = env.fromCollection(list);
-		assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
-	}
-
-	@Test
-	public void testDefaultParallelismIsDefault() {
-		assertEquals(
-				ExecutionConfig.PARALLELISM_DEFAULT,
-				StreamExecutionEnvironment.createLocalEnvironment().getParallelism());
-
-		assertEquals(
-				ExecutionConfig.PARALLELISM_DEFAULT,
-				StreamExecutionEnvironment.createRemoteEnvironment("dummy", 1234).getParallelism());
-
-		StreamExecutionEnvironment contextEnv = new StreamContextEnvironment(
-				new ContextEnvironment(
-						mock(ClusterClient.class),
-						Collections.<URL>emptyList(),
-						Collections.<URL>emptyList(),
-						this.getClass().getClassLoader(),
-						null));
-
-		assertEquals(
-				ExecutionConfig.PARALLELISM_DEFAULT,
-				contextEnv.getParallelism());
-	}
-
-	@Test
-	public void testParallelismBounds() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-			}
-
-			@Override
-			public void cancel() {
-			}
-		};
-
-
-		SingleOutputStreamOperator<Object> operator =
-				env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void flatMap(Integer value, Collector<Object> out) throws Exception {
-
-			}
-		});
-
-		// default value for max parallelism
-		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
-
-		// bounds for parallelism 1
-		try {
-			operator.setParallelism(0);
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for parallelism 2
-		operator.setParallelism(1);
-		Assert.assertEquals(1, operator.getParallelism());
-
-		// bounds for parallelism 3
-		operator.setParallelism(1 << 15);
-		Assert.assertEquals(1 << 15, operator.getParallelism());
-
-		// default value after generating
-		env.getStreamGraph().getJobGraph();
-		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
-
-		// configured value after generating
-		env.setMaxParallelism(42);
-		env.getStreamGraph().getJobGraph();
-		Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
-
-		// bounds configured parallelism 1
-		try {
-			env.setMaxParallelism(0);
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds configured parallelism 2
-		try {
-			env.setMaxParallelism(1 + (1 << 15));
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for max parallelism 1
-		try {
-			operator.setMaxParallelism(0);
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for max parallelism 2
-		try {
-			operator.setMaxParallelism(1 + (1 << 15));
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for max parallelism 3
-		operator.setMaxParallelism(1);
-		Assert.assertEquals(1, operator.getTransformation().getMaxParallelism());
-
-		// bounds for max parallelism 4
-		operator.setMaxParallelism(1 << 15);
-		Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism());
-
-		// override config
-		env.getStreamGraph().getJobGraph();
-		Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism());
-	}
-
-	/////////////////////////////////////////////////////////////
-	// Utilities
-	/////////////////////////////////////////////////////////////
-
-
-	private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) {
-		StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
-		StreamGraph streamGraph = env.getStreamGraph();
-		return streamGraph.getStreamNode(dataStream.getId()).getOperator();
-	}
-
-	@SuppressWarnings("unchecked")
-	private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
-		dataStreamSource.addSink(new DiscardingSink<T>());
-		AbstractUdfStreamOperator<?, ?> operator =
-				(AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
-		return (SourceFunction<T>) operator.getUserFunction();
-	}
-
-	public static class DummySplittableIterator<T> extends SplittableIterator<T> {
-		private static final long serialVersionUID = 1312752876092210499L;
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public Iterator<T>[] split(int numPartitions) {
-			return (Iterator<T>[]) new Iterator<?>[0];
-		}
-
-		@Override
-		public int getMaximumNumberOfSplits() {
-			return 0;
-		}
-
-		@Override
-		public boolean hasNext() {
-			return false;
-		}
-
-		@Override
-		public T next() {
-			throw new NoSuchElementException();
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-	}
-
-	public static class ParentClass {
-		int num;
-		String string;
-		public ParentClass(int num, String string) {
-			this.num = num;
-			this.string = string;
-		}
-	}
-
-	public static class SubClass extends ParentClass{
-		public SubClass(int num, String string) {
-			super(num, string);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index abf51ab..6d2fcaa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -112,10 +112,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testDisabledCheckpointing() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamGraph streamGraph = new StreamGraph(env, 1 /* default parallelism */);
+		StreamGraph streamGraph = new StreamGraph(env);
 		assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
-		StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */);
+		StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph);
 		JobGraph jobGraph = jobGraphGenerator.createJobGraph();
 
 		JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
@@ -137,7 +137,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 				}
 			})
 			.print();
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
 
 		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
 		JobVertex sourceVertex = verticesSorted.get(0);
@@ -224,7 +224,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		});
 		sinkMethod.invoke(sink, resource5);
 
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
 
 		JobVertex sourceMapFilterVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
 		JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
@@ -291,7 +291,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		}).disableChaining().name("test_sink");
 		sinkMethod.invoke(sink, resource5);
 
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
 
 		for (JobVertex jobVertex : jobGraph.getVertices()) {
 			if (jobVertex.getName().contains("test_source")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index c4bed37..7cac1e1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -131,7 +131,7 @@ public class FoldApplyProcessWindowFunctionTest {
 
 		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */);
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
 
 		List<Integer> result = new ArrayList<>();
 		List<Integer> input = new ArrayList<>();
@@ -240,7 +240,7 @@ public class FoldApplyProcessWindowFunctionTest {
 
 		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */);
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
 
 		List<Integer> result = new ArrayList<>();
 		List<Integer> input = new ArrayList<>();
@@ -310,10 +310,6 @@ public class FoldApplyProcessWindowFunctionTest {
 
 	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
-		public DummyStreamExecutionEnvironment() {
-			super(1);
-		}
-
 		@Override
 		public JobExecutionResult execute(String jobName) throws Exception {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index 6ddca34..fecd440 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -117,7 +117,7 @@ public class FoldApplyWindowFunctionTest {
 
 		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */);
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
 
 		List<Integer> result = new ArrayList<>();
 		List<Integer> input = new ArrayList<>();
@@ -140,10 +140,6 @@ public class FoldApplyWindowFunctionTest {
 
 	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
-		public DummyStreamExecutionEnvironment() {
-			super(1);
-		}
-
 		@Override
 		public JobExecutionResult execute(String jobName) throws Exception {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 927c94e..0b2587e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -672,6 +672,23 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
 object StreamExecutionEnvironment {
 
+  /**
+   * Sets the default parallelism that will be used for the local execution
+   * environment created by [[createLocalEnvironment()]].
+   *
+   * @param parallelism The default parallelism to use for local execution.
+   */
+  @PublicEvolving
+  def setDefaultLocalParallelism(parallelism: Int) : Unit =
+    JavaEnv.setDefaultLocalParallelism(parallelism)
+
+  /**
+   * Gets the default parallelism that will be used for the local execution environment created by
+   * [[createLocalEnvironment()]].
+   */
+  @PublicEvolving
+  def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism
+  
   // --------------------------------------------------------------------------
   //  context environment
   // --------------------------------------------------------------------------
@@ -693,14 +710,13 @@ object StreamExecutionEnvironment {
   /**
    * Creates a local execution environment. The local execution environment will run the
    * program in a multi-threaded fashion in the same JVM as the environment was created in.
+   *
+   * This method sets the environment's default parallelism to given parameter, which
+   * defaults to the value set via [[setDefaultLocalParallelism(Int)]].
    */
-  def createLocalEnvironment(parallelism: Int = -1):
+  def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism):
       StreamExecutionEnvironment = {
-    if (parallelism == -1) {
-      new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment())
-    } else {
-      new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
-    }
+    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 08153be..60c609d 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -255,10 +255,9 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val sink = map.addSink(x => {})
 
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    // default parallelism is only actualized when transforming to JobGraph
-    assert(-1 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(-1 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     try {
       src.setParallelism(3)
@@ -273,11 +272,9 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     // the parallelism does not change since some windowing code takes the parallelism from
     // input operations and that cannot change dynamically
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    // setting a parallelism on the env/in the ExecutionConfig means that operators
-    // pick it up when being instantiated
-    assert(7 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(7 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     val parallelSource = env.generateSequence(0, 0)
     parallelSource.print()

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 90d8790..64c68dc 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -36,7 +36,6 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	
 
 	public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
-		super(parallelism);
 		this.executor = Preconditions.checkNotNull(executor);
 		setParallelism(parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0182141d/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 0a9f686..49ff744 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -382,10 +382,6 @@ public class AccumulatorLiveITCase extends TestLogger {
 	 */
 	private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
-		public DummyStreamExecutionEnvironment() {
-			super(1 /* default parallelism */);
-		}
-
 		@Override
 		public JobExecutionResult execute() throws Exception {
 			return execute("default");


Mime
View raw message