Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2FB82200C4C for ; Tue, 4 Apr 2017 13:56:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2DF17160B81; Tue, 4 Apr 2017 11:56:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2FC91160BA1 for ; Tue, 4 Apr 2017 13:56:48 +0200 (CEST) Received: (qmail 70801 invoked by uid 500); 4 Apr 2017 11:56:46 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 70788 invoked by uid 99); 4 Apr 2017 11:56:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Apr 2017 11:56:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82875DFF0F; Tue, 4 Apr 2017 11:56:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aljoscha@apache.org To: commits@flink.apache.org Date: Tue, 04 Apr 2017 11:56:46 -0000 Message-Id: <7e1916beeb7f4dfab3f5590b7337647b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] flink git commit: Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator" archived-at: Tue, 04 Apr 2017 11:56:50 -0000 Repository: flink Updated Branches: refs/heads/release-1.2 3c63c9e01 -> 3e41ed1b1 Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator" This reverts commit b563f0ae2e7b7233e29e03fbb2cf18b0d853c0ca. This fix was causing more problems than it was solving. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e41ed1b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e41ed1b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e41ed1b Branch: refs/heads/release-1.2 Commit: 3e41ed1b1e9ee30f546374206d4759e009ced39b Parents: fd98e8b Author: Aljoscha Krettek Authored: Mon Apr 3 18:40:14 2017 +0200 Committer: Aljoscha Krettek Committed: Tue Apr 4 13:42:17 2017 +0200 ---------------------------------------------------------------------- .../api/environment/LocalStreamEnvironment.java | 26 +- .../environment/RemoteStreamEnvironment.java | 5 - .../environment/StreamContextEnvironment.java | 13 +- .../environment/StreamExecutionEnvironment.java | 65 ++-- .../api/environment/StreamPlanEnvironment.java | 15 +- .../flink/streaming/api/graph/StreamGraph.java | 6 +- .../api/graph/StreamGraphGenerator.java | 12 +- .../api/graph/StreamingJobGraphGenerator.java | 14 +- .../api/StreamExecutionEnvironmentTest.java | 289 +++++++++++++++++ .../StreamExecutionEnvironmentTest.java | 317 ------------------- .../graph/StreamingJobGraphGeneratorTest.java | 20 +- .../operators/FoldApplyWindowFunctionTest.java | 6 +- .../api/scala/StreamExecutionEnvironment.scala | 28 +- .../streaming/api/scala/DataStreamTest.scala | 11 +- .../streaming/util/TestStreamEnvironment.java | 1 - .../accumulators/AccumulatorLiveITCase.java | 4 - 16 files changed, 400 insertions(+), 432 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index cb60552..f8c9c42 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -45,9 +45,6 @@ import org.slf4j.LoggerFactory; @Public public class LocalStreamEnvironment extends StreamExecutionEnvironment { - /** The default parallelism used when creating a local environment */ - private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); - private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class); /** The configuration to use for the local cluster */ @@ -57,43 +54,24 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { * Creates a new local stream environment that uses the default configuration. */ public LocalStreamEnvironment() { - this(defaultLocalParallelism); + this(null); } /** - * Creates a new local stream environment that uses the default configuration. - */ - public LocalStreamEnvironment(int parallelism) { - this(null, parallelism); - } - - - /** * Creates a new local stream environment that configures its local executor with the given configuration. * * @param config The configuration used to configure the local executor. */ public LocalStreamEnvironment(Configuration config) { - this(config, defaultLocalParallelism); - } - - /** - * Creates a new local stream environment that configures its local executor with the given configuration. - * - * @param config The configuration used to configure the local executor. - */ - public LocalStreamEnvironment(Configuration config, int parallelism) { - super(parallelism); if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) { throw new InvalidProgramException( "The LocalStreamEnvironment cannot be used when submitting a program through a client, " + "or running in a TestEnvironment context."); } - + this.conf = config == null ? new Configuration() : config; } - /** * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user * specified name. http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 5684e28..333f9c0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -37,7 +37,6 @@ import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.streaming.api.graph.StreamGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,10 +129,6 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { * The protocol must be supported by the {@link java.net.URLClassLoader}. */ public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) { - super(GlobalConfiguration.loadConfiguration().getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, - ConfigConstants.DEFAULT_PARALLELISM)); - if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) { throw new InvalidProgramException( "The RemoteEnvironment cannot be used when submitting a program through a client, " + http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 51078f2..49c5347 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -38,13 +38,14 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { private final ContextEnvironment ctx; protected StreamContextEnvironment(ContextEnvironment ctx) { - // if the batch ContextEnvironment has a parallelism this must have come from - // the CLI Client. We should set that as our default parallelism - super(ctx.getParallelism() > 0 ? ctx.getParallelism() : - GlobalConfiguration.loadConfiguration().getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, - ConfigConstants.DEFAULT_PARALLELISM)); this.ctx = ctx; + if (ctx.getParallelism() > 0) { + setParallelism(ctx.getParallelism()); + } else { + setParallelism(GlobalConfiguration.loadConfiguration().getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM)); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 6ac3622..dab0a06 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -111,6 +111,9 @@ public abstract class StreamExecutionEnvironment { /** The environment of the context (local by default, cluster if invoked through command line) */ private static StreamExecutionEnvironmentFactory contextEnvironmentFactory; + /** The default parallelism used when creating a local environment */ + private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); + // ------------------------------------------------------------------------ /** The execution configuration for this environment */ @@ -131,23 +134,11 @@ public abstract class StreamExecutionEnvironment { /** The time characteristic used by the data streams */ private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; - /** The parallelism to use when no parallelism is set on an operation. */ - private final int defaultParallelism; - // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- - - public StreamExecutionEnvironment() { - this(ConfigConstants.DEFAULT_PARALLELISM); - } - - public StreamExecutionEnvironment(int defaultParallelism) { - this.defaultParallelism = defaultParallelism; - } - /** * Gets the config object. */ @@ -1523,7 +1514,7 @@ public abstract class StreamExecutionEnvironment { if (transformations.size() <= 0) { throw new IllegalStateException("No operators defined in streaming topology. Cannot execute."); } - return StreamGraphGenerator.generate(this, transformations, defaultParallelism); + return StreamGraphGenerator.generate(this, transformations); } /** @@ -1611,7 +1602,7 @@ public abstract class StreamExecutionEnvironment { * @return A local execution environment. */ public static LocalStreamEnvironment createLocalEnvironment() { - return new LocalStreamEnvironment(); + return createLocalEnvironment(defaultLocalParallelism); } /** @@ -1620,12 +1611,14 @@ public abstract class StreamExecutionEnvironment { * environment was created in. It will use the parallelism specified in the * parameter. * - * @param defaultParallelism The default parallelism for the local environment. - * + * @param parallelism + * The parallelism for the local environment. * @return A local execution environment with the specified parallelism. */ - public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism) { - return new LocalStreamEnvironment(defaultParallelism); + public static LocalStreamEnvironment createLocalEnvironment(int parallelism) { + LocalStreamEnvironment env = new LocalStreamEnvironment(); + env.setParallelism(parallelism); + return env; } /** @@ -1634,13 +1627,16 @@ public abstract class StreamExecutionEnvironment { * environment was created in. It will use the parallelism specified in the * parameter. * - * @param defaultParallelism The parallelism for the local environment. - * @param configuration Pass a custom configuration into the cluster - * + * @param parallelism + * The parallelism for the local environment. + * @param configuration + * Pass a custom configuration into the cluster * @return A local execution environment with the specified parallelism. */ - public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism, Configuration configuration) { - return new LocalStreamEnvironment(configuration, defaultParallelism); + public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) { + LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration); + currentEnvironment.setParallelism(parallelism); + return currentEnvironment; } /** @@ -1665,6 +1661,7 @@ public abstract class StreamExecutionEnvironment { conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf); + localEnv.setParallelism(defaultLocalParallelism); return localEnv; } @@ -1750,6 +1747,28 @@ public abstract class StreamExecutionEnvironment { return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles); } + /** + * Gets the default parallelism that will be used for the local execution environment created by + * {@link #createLocalEnvironment()}. + * + * @return The default local parallelism + */ + @PublicEvolving + public static int getDefaultLocalParallelism() { + return defaultLocalParallelism; + } + + /** + * Sets the default parallelism that will be used for the local execution + * environment created by {@link #createLocalEnvironment()}. + * + * @param parallelism The parallelism to use as the default local parallelism. + */ + @PublicEvolving + public static void setDefaultLocalParallelism(int parallelism) { + defaultLocalParallelism = parallelism; + } + // -------------------------------------------------------------------------------------------- // Methods to control the context and local environments for execution from packaged programs // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java index 9c676c4..b1521f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java @@ -32,11 +32,18 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment { private ExecutionEnvironment env; protected StreamPlanEnvironment(ExecutionEnvironment env) { - super(GlobalConfiguration.loadConfiguration().getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, - ConfigConstants.DEFAULT_PARALLELISM)); - + super(); this.env = env; + + int parallelism = env.getParallelism(); + if (parallelism > 0) { + setParallelism(parallelism); + } else { + // determine parallelism + setParallelism(GlobalConfiguration.loadConfiguration().getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM)); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 1b4acd0..2f80764 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -91,14 +91,12 @@ public class StreamGraph extends StreamingPlan { private AbstractStateBackend stateBackend; private Set> iterationSourceSinkPairs; - private final int defaultParallelism; - public StreamGraph(StreamExecutionEnvironment environment, int defaultParallelism) { + public StreamGraph(StreamExecutionEnvironment environment) { this.environment = environment; this.executionConfig = environment.getConfig(); this.checkpointConfig = environment.getCheckpointConfig(); - this.defaultParallelism = defaultParallelism; // create an empty new stream graph. clear(); } @@ -598,7 +596,7 @@ public class StreamGraph extends StreamingPlan { + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); } - StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this, defaultParallelism); + StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this); return jobgraphGenerator.createJobGraph(); } http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 333e4f9..ddd0515 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -97,11 +97,12 @@ public class StreamGraphGenerator { // we have loops, i.e. feedback edges. private Map, Collection> alreadyTransformed; + /** * Private constructor. The generator should only be invoked using {@link #generate}. */ - private StreamGraphGenerator(StreamExecutionEnvironment env, int defaultParallelism) { - this.streamGraph = new StreamGraph(env, defaultParallelism); + private StreamGraphGenerator(StreamExecutionEnvironment env) { + this.streamGraph = new StreamGraph(env); this.streamGraph.setChaining(env.isChainingEnabled()); this.streamGraph.setStateBackend(env.getStateBackend()); this.env = env; @@ -118,11 +119,8 @@ public class StreamGraphGenerator { * * @return The generated {@code StreamGraph} */ - public static StreamGraph generate( - StreamExecutionEnvironment env, - List> transformations, - int defaultParallelism) { - return new StreamGraphGenerator(env, defaultParallelism).generateInternal(transformations); + public static StreamGraph generate(StreamExecutionEnvironment env, List> transformations) { + return new StreamGraphGenerator(env).generateInternal(transformations); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index f87e51e..8877c80 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.graph; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; @@ -87,13 +86,10 @@ public class StreamingJobGraphGenerator { private final StreamGraphHasher defaultStreamGraphHasher; private final List legacyStreamGraphHashers; - private final int defaultParallelism; - - public StreamingJobGraphGenerator(StreamGraph streamGraph, int defaultParallelism) { + public StreamingJobGraphGenerator(StreamGraph streamGraph) { this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher()); - this.defaultParallelism = defaultParallelism; } private void init() { @@ -308,12 +304,12 @@ public class StreamingJobGraphGenerator { int parallelism = streamNode.getParallelism(); - if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) { - parallelism = defaultParallelism; + if (parallelism > 0) { + jobVertex.setParallelism(parallelism); + } else { + parallelism = jobVertex.getParallelism(); } - jobVertex.setParallelism(parallelism); - jobVertex.setMaxParallelism(streamNode.getMaxParallelism()); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java new file mode 100644 index 0000000..3fc1344 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.FromElementsFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.SplittableIterator; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class StreamExecutionEnvironmentTest { + + @Test + public void fromElementsWithBaseTypeTest1() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); + } + + @Test(expected = IllegalArgumentException.class) + public void fromElementsWithBaseTypeTest2() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); + } + + @Test + @SuppressWarnings("unchecked") + public void testFromCollectionParallelism() { + try { + TypeInformation typeInfo = BasicTypeInfo.INT_TYPE_INFO; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource dataStream1 = env.fromCollection(new DummySplittableIterator(), typeInfo); + + try { + dataStream1.setParallelism(4); + fail("should throw an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + dataStream1.addSink(new DiscardingSink()); + + DataStreamSource dataStream2 = env.fromParallelCollection(new DummySplittableIterator(), + typeInfo).setParallelism(4); + + dataStream2.addSink(new DiscardingSink()); + + env.getExecutionPlan(); + + assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism()); + assertEquals("Parallelism of parallel collection source must be 4.", + 4, + env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSources() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SourceFunction srcFun = new SourceFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext ctx) throws Exception { + } + + @Override + public void cancel() { + } + }; + DataStreamSource src1 = env.addSource(srcFun); + src1.addSink(new DiscardingSink()); + assertEquals(srcFun, getFunctionFromDataSource(src1)); + + List list = Arrays.asList(0L, 1L, 2L); + + DataStreamSource src2 = env.generateSequence(0, 2); + assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource); + + DataStreamSource src3 = env.fromElements(0L, 1L, 2L); + assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction); + + DataStreamSource src4 = env.fromCollection(list); + assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction); + } + + @Test + public void testParallelismBounds() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SourceFunction srcFun = new SourceFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext ctx) throws Exception { + } + + @Override + public void cancel() { + } + }; + + + SingleOutputStreamOperator operator = + env.addSource(srcFun).flatMap(new FlatMapFunction() { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Integer value, Collector out) throws Exception { + + } + }); + + // default value for max parallelism + Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); + + // bounds for parallelism 1 + try { + operator.setParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for parallelism 2 + operator.setParallelism(1); + Assert.assertEquals(1, operator.getParallelism()); + + // bounds for parallelism 3 + operator.setParallelism(1 << 15); + Assert.assertEquals(1 << 15, operator.getParallelism()); + + // default value after generating + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); + + // configured value after generating + env.setMaxParallelism(42); + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(42, operator.getTransformation().getMaxParallelism()); + + // bounds configured parallelism 1 + try { + env.setMaxParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds configured parallelism 2 + try { + env.setMaxParallelism(1 + (1 << 15)); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 1 + try { + operator.setMaxParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 2 + try { + operator.setMaxParallelism(1 + (1 << 15)); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 3 + operator.setMaxParallelism(1); + Assert.assertEquals(1, operator.getTransformation().getMaxParallelism()); + + // bounds for max parallelism 4 + operator.setMaxParallelism(1 << 15); + Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism()); + + // override config + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism()); + } + + ///////////////////////////////////////////////////////////// + // Utilities + ///////////////////////////////////////////////////////////// + + + private static StreamOperator getOperatorFromDataStream(DataStream dataStream) { + StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); + StreamGraph streamGraph = env.getStreamGraph(); + return streamGraph.getStreamNode(dataStream.getId()).getOperator(); + } + + @SuppressWarnings("unchecked") + private static SourceFunction getFunctionFromDataSource(DataStreamSource dataStreamSource) { + dataStreamSource.addSink(new DiscardingSink()); + AbstractUdfStreamOperator operator = + (AbstractUdfStreamOperator) getOperatorFromDataStream(dataStreamSource); + return (SourceFunction) operator.getUserFunction(); + } + + public static class DummySplittableIterator extends SplittableIterator { + private static final long serialVersionUID = 1312752876092210499L; + + @SuppressWarnings("unchecked") + @Override + public Iterator[] split(int numPartitions) { + return (Iterator[]) new Iterator[0]; + } + + @Override + public int getMaximumNumberOfSplits() { + return 0; + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public T next() { + throw new NoSuchElementException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + public static class ParentClass { + int num; + String string; + public ParentClass(int num, String string) { + this.num = num; + this.string = string; + } + } + + public static class SubClass extends ParentClass{ + public SubClass(int num, String string) { + super(num, string); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java deleted file mode 100644 index d29c833..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.environment; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.ContextEnvironment; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.functions.source.FromElementsFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.util.Collector; -import org.apache.flink.util.SplittableIterator; -import org.junit.Assert; -import org.junit.Test; - -import java.net.URL; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -public class StreamExecutionEnvironmentTest { - - @Test - public void fromElementsWithBaseTypeTest1() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); - } - - @Test(expected = IllegalArgumentException.class) - public void fromElementsWithBaseTypeTest2() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); - } - - @Test - @SuppressWarnings("unchecked") - public void testFromCollectionParallelism() { - try { - TypeInformation typeInfo = BasicTypeInfo.INT_TYPE_INFO; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource dataStream1 = env.fromCollection(new DummySplittableIterator(), typeInfo); - - try { - dataStream1.setParallelism(4); - fail("should throw an exception"); - } - catch (IllegalArgumentException e) { - // expected - } - - dataStream1.addSink(new DiscardingSink()); - - DataStreamSource dataStream2 = env.fromParallelCollection(new DummySplittableIterator(), - typeInfo).setParallelism(4); - - dataStream2.addSink(new DiscardingSink()); - - env.getExecutionPlan(); - - assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism()); - assertEquals("Parallelism of parallel collection source must be 4.", - 4, - env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSources() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SourceFunction srcFun = new SourceFunction() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext ctx) throws Exception { - } - - @Override - public void cancel() { - } - }; - DataStreamSource src1 = env.addSource(srcFun); - src1.addSink(new DiscardingSink()); - assertEquals(srcFun, getFunctionFromDataSource(src1)); - - List list = Arrays.asList(0L, 1L, 2L); - - DataStreamSource src2 = env.generateSequence(0, 2); - assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource); - - DataStreamSource src3 = env.fromElements(0L, 1L, 2L); - assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction); - - DataStreamSource src4 = env.fromCollection(list); - assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction); - } - - @Test - public void testDefaultParallelismIsDefault() { - assertEquals( - ExecutionConfig.PARALLELISM_DEFAULT, - StreamExecutionEnvironment.createLocalEnvironment().getParallelism()); - - assertEquals( - ExecutionConfig.PARALLELISM_DEFAULT, - StreamExecutionEnvironment.createRemoteEnvironment("dummy", 1234).getParallelism()); - - StreamExecutionEnvironment contextEnv = new StreamContextEnvironment( - new ContextEnvironment( - mock(ClusterClient.class), - Collections.emptyList(), - Collections.emptyList(), - this.getClass().getClassLoader(), - null)); - - assertEquals( - ExecutionConfig.PARALLELISM_DEFAULT, - contextEnv.getParallelism()); - } - - @Test - public void testParallelismBounds() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SourceFunction srcFun = new SourceFunction() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext ctx) throws Exception { - } - - @Override - public void cancel() { - } - }; - - - SingleOutputStreamOperator operator = - env.addSource(srcFun).flatMap(new FlatMapFunction() { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap(Integer value, Collector out) throws Exception { - - } - }); - - // default value for max parallelism - Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); - - // bounds for parallelism 1 - try { - operator.setParallelism(0); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for parallelism 2 - operator.setParallelism(1); - Assert.assertEquals(1, operator.getParallelism()); - - // bounds for parallelism 3 - operator.setParallelism(1 << 15); - Assert.assertEquals(1 << 15, operator.getParallelism()); - - // default value after generating - env.getStreamGraph().getJobGraph(); - Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); - - // configured value after generating - env.setMaxParallelism(42); - env.getStreamGraph().getJobGraph(); - Assert.assertEquals(42, operator.getTransformation().getMaxParallelism()); - - // bounds configured parallelism 1 - try { - env.setMaxParallelism(0); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds configured parallelism 2 - try { - env.setMaxParallelism(1 + (1 << 15)); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for max parallelism 1 - try { - operator.setMaxParallelism(0); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for max parallelism 2 - try { - operator.setMaxParallelism(1 + (1 << 15)); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for max parallelism 3 - operator.setMaxParallelism(1); - Assert.assertEquals(1, operator.getTransformation().getMaxParallelism()); - - // bounds for max parallelism 4 - operator.setMaxParallelism(1 << 15); - Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism()); - - // override config - env.getStreamGraph().getJobGraph(); - Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism()); - } - - ///////////////////////////////////////////////////////////// - // Utilities - ///////////////////////////////////////////////////////////// - - - private static StreamOperator getOperatorFromDataStream(DataStream dataStream) { - StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); - StreamGraph streamGraph = env.getStreamGraph(); - return streamGraph.getStreamNode(dataStream.getId()).getOperator(); - } - - @SuppressWarnings("unchecked") - private static SourceFunction getFunctionFromDataSource(DataStreamSource dataStreamSource) { - dataStreamSource.addSink(new DiscardingSink()); - AbstractUdfStreamOperator operator = - (AbstractUdfStreamOperator) getOperatorFromDataStream(dataStreamSource); - return (SourceFunction) operator.getUserFunction(); - } - - public static class DummySplittableIterator extends SplittableIterator { - private static final long serialVersionUID = 1312752876092210499L; - - @SuppressWarnings("unchecked") - @Override - public Iterator[] split(int numPartitions) { - return (Iterator[]) new Iterator[0]; - } - - @Override - public int getMaximumNumberOfSplits() { - return 0; - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public T next() { - throw new NoSuchElementException(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - public static class ParentClass { - int num; - String string; - public ParentClass(int num, String string) { - this.num = num; - this.string = string; - } - } - - public static class SubClass extends ParentClass{ - public SubClass(int num, String string) { - super(num, string); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 5b4dbf5..4d462d0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -42,7 +42,7 @@ import static org.junit.Assert.assertTrue; @SuppressWarnings("serial") public class StreamingJobGraphGeneratorTest extends TestLogger { - + @Test public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException { final long seed = System.currentTimeMillis(); @@ -50,12 +50,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamGraph streamingJob = new StreamGraph(env, 1 /* default parallelism */); - StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob, 1 /* default parallelism */); - + StreamGraph streamingJob = new StreamGraph(env); + StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob); + boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean(); int dop = 1 + r.nextInt(10); - + ExecutionConfig config = streamingJob.getExecutionConfig(); if(closureCleanerEnabled) { config.enableClosureCleaner(); @@ -83,7 +83,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { config.disableSysoutLogging(); } config.setParallelism(dop); - + JobGraph jobGraph = compiler.createJobGraph(); final String EXEC_CONFIG_KEY = "runtime.config"; @@ -108,7 +108,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled()); assertEquals(dop, executionConfig.getParallelism()); } - + @Test public void testParallelismOneNotChained() { @@ -164,10 +164,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { @Test public void testDisabledCheckpointing() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamGraph streamGraph = new StreamGraph(env, 1 /* default parallelism */); + StreamGraph streamGraph = new StreamGraph(env); assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled()); - StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */); + StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph); JobGraph jobGraph = jobGraphGenerator.createJobGraph(); JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings(); @@ -189,7 +189,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { } }) .print(); - JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph(); + JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0); JobVertex mapPrintVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1); http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java index af413ad..91ec427 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -115,7 +115,7 @@ public class FoldApplyWindowFunctionTest { transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); - StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */); + StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); List result = new ArrayList<>(); List input = new ArrayList<>(); @@ -138,10 +138,6 @@ public class FoldApplyWindowFunctionTest { public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { - public DummyStreamExecutionEnvironment() { - super(1); - } - @Override public JobExecutionResult execute(String jobName) throws Exception { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 60798e0..22f1264 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -673,6 +673,23 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { object StreamExecutionEnvironment { + /** + * Sets the default parallelism that will be used for the local execution + * environment created by [[createLocalEnvironment()]]. + * + * @param parallelism The default parallelism to use for local execution. + */ + @PublicEvolving + def setDefaultLocalParallelism(parallelism: Int) : Unit = + JavaEnv.setDefaultLocalParallelism(parallelism) + + /** + * Gets the default parallelism that will be used for the local execution environment created by + * [[createLocalEnvironment()]]. + */ + @PublicEvolving + def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism + // -------------------------------------------------------------------------- // context environment // -------------------------------------------------------------------------- @@ -694,14 +711,13 @@ object StreamExecutionEnvironment { /** * Creates a local execution environment. The local execution environment will run the * program in a multi-threaded fashion in the same JVM as the environment was created in. + * + * This method sets the environment's default parallelism to given parameter, which + * defaults to the value set via [[setDefaultLocalParallelism(Int)]]. */ - def createLocalEnvironment(parallelism: Int = -1): + def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism): StreamExecutionEnvironment = { - if (parallelism == -1) { - new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment()) - } else { - new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism)) - } + new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism)) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index b498edc..adb59f2 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -255,10 +255,9 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { val sink = map.addSink(x => {}) assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism) - // default parallelism is only actualized when transforming to JobGraph - assert(-1 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) + assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism) - assert(-1 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) + assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) try { src.setParallelism(3) @@ -273,11 +272,9 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { // the parallelism does not change since some windowing code takes the parallelism from // input operations and that cannot change dynamically assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism) - // setting a parallelism on the env/in the ExecutionConfig means that operators - // pick it up when being instantiated - assert(7 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) + assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism) - assert(7 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) + assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) val parallelSource = env.generateSequence(0, 0) parallelSource.print() http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 90d8790..64c68dc 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -36,7 +36,6 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) { - super(parallelism); this.executor = Preconditions.checkNotNull(executor); setParallelism(parallelism); } http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 883f4b4..c56fa91 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -381,10 +381,6 @@ public class AccumulatorLiveITCase { */ private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { - public DummyStreamExecutionEnvironment() { - super(1 /* default parallelism */); - } - @Override public JobExecutionResult execute() throws Exception { return execute("default");