flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-8698] [flip6] Use Flip6LocalStreamEnvironment instead of LocalStreamEnvironment
Date Tue, 20 Feb 2018 10:17:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9da315234 -> d87531294


[FLINK-8698] [flip6] Use Flip6LocalStreamEnvironment instead of LocalStreamEnvironment


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c18fa5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c18fa5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c18fa5b

Branch: refs/heads/master
Commit: 5c18fa5b7214a0be73d208046533477927a36532
Parents: 9da3152
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Feb 19 12:00:08 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Feb 20 11:16:57 2018 +0100

----------------------------------------------------------------------
 .../environment/Flip6LocalStreamEnvironment.java | 15 ++-------------
 .../api/environment/LocalStreamEnvironment.java  |  2 +-
 .../environment/StreamExecutionEnvironment.java  | 19 +++++++++++--------
 3 files changed, 14 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c18fa5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 9d4f2a7..c5952fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -18,9 +18,7 @@
 package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -40,13 +38,10 @@ import org.slf4j.LoggerFactory;
  * parallelism can be set via {@link #setParallelism(int)}.
  */
 @Internal
-public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
+public class Flip6LocalStreamEnvironment extends LocalStreamEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);
 
-	/** The configuration to use for the mini cluster. */
-	private final Configuration conf;
-
 	/**
 	 * Creates a new mini cluster stream environment that uses the default configuration.
 	 */
@@ -60,13 +55,7 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment
{
 	 * @param config The configuration used to configure the local executor.
 	 */
 	public Flip6LocalStreamEnvironment(Configuration config) {
-		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
-			throw new InvalidProgramException(
-					"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a
client, " +
-							"or running in a TestEnvironment context.");
-		}
-
-		this.conf = config == null ? new Configuration() : config;
+		super(config);
 		setParallelism(1);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c18fa5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 6b31ff8..a53e2a3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -49,7 +49,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
 
 	/** The configuration to use for the local cluster. */
-	private final Configuration conf;
+	protected final Configuration conf;
 
 	/**
 	 * Creates a new local stream environment that uses the default configuration.

http://git-wip-us.apache.org/repos/asf/flink/blob/5c18fa5b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 56a7e29..c39201c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -47,6 +47,7 @@ import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -1633,9 +1634,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A local execution environment with the specified parallelism.
 	 */
 	public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
-		LocalStreamEnvironment env = new LocalStreamEnvironment();
-		env.setParallelism(parallelism);
-		return env;
+		return createLocalEnvironment(parallelism, new Configuration());
 	}
 
 	/**
@@ -1651,7 +1650,14 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A local execution environment with the specified parallelism.
 	 */
 	public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration
configuration) {
-		LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration);
+		final LocalStreamEnvironment currentEnvironment;
+
+		if (CoreOptions.FLIP6_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+			currentEnvironment = new Flip6LocalStreamEnvironment(configuration);
+		} else {
+			currentEnvironment = new LocalStreamEnvironment(configuration);
+		}
+
 		currentEnvironment.setParallelism(parallelism);
 		return currentEnvironment;
 	}
@@ -1673,10 +1679,7 @@ public abstract class StreamExecutionEnvironment {
 
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
-		LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
-		localEnv.setParallelism(defaultLocalParallelism);
-
-		return localEnv;
+		return createLocalEnvironment(defaultLocalParallelism, conf);
 	}
 
 	/**


Mime
View raw message