beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-3089) Issue with setting the parallelism at client level using Flink runner
Date Wed, 19 Sep 2018 17:20:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3089?focusedWorklogId=145731&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145731
]

ASF GitHub Bot logged work on BEAM-3089:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Sep/18 17:19
            Start Date: 19/Sep/18 17:19
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6426: [BEAM-3089] Fix default values in
FlinkPipelineOptions / Add tests
URL: https://github.com/apache/beam/pull/6426
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
deleted file mode 100644
index d448bed2333..00000000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-
-/**
- * {@link DefaultValueFactory} for getting a default value for the parallelism option on
{@link
- * FlinkPipelineOptions}.
- *
- * <p>This will return either the default value from {@link GlobalConfiguration} or
{@code 1}. A
- * valid {@link GlobalConfiguration} is only available if the program is executed by the
Flink run
- * scripts.
- */
-public class DefaultParallelismFactory implements DefaultValueFactory<Integer> {
-  @Override
-  public Integer create(PipelineOptions options) {
-    return GlobalConfiguration.loadConfiguration()
-        .getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
-  }
-}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 4ace1eccc37..40a8d51ee40 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -17,12 +17,16 @@
  */
 package org.apache.beam.runners.flink;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.CollectionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -42,6 +46,12 @@
    */
   public static ExecutionEnvironment createBatchExecutionEnvironment(
       FlinkPipelineOptions options, List<String> filesToStage) {
+    return createBatchExecutionEnvironment(options, filesToStage, null);
+  }
+
+  @VisibleForTesting
+  static ExecutionEnvironment createBatchExecutionEnvironment(
+      FlinkPipelineOptions options, List<String> filesToStage, @Nullable String confDir)
{
 
     LOG.info("Creating a Batch Execution Environment.");
 
@@ -71,9 +81,18 @@ public static ExecutionEnvironment createBatchExecutionEnvironment(
     if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment))
{
       flinkBatchEnv.setParallelism(options.getParallelism());
     }
+    // Set the correct parallelism, required by UnboundedSourceWrapper to generate consistent
splits.
+    final int parallelism;
+    if (flinkBatchEnv instanceof CollectionEnvironment) {
+      parallelism = 1;
+    } else {
+      parallelism =
+          determineParallelism(options.getParallelism(), flinkBatchEnv.getParallelism(),
confDir);
+    }
 
+    flinkBatchEnv.setParallelism(parallelism);
     // set parallelism in the options (required by some execution code)
-    options.setParallelism(flinkBatchEnv.getParallelism());
+    options.setParallelism(parallelism);
 
     if (options.getObjectReuse()) {
       flinkBatchEnv.getConfig().enableObjectReuse();
@@ -93,6 +112,12 @@ public static ExecutionEnvironment createBatchExecutionEnvironment(
    */
   public static StreamExecutionEnvironment createStreamExecutionEnvironment(
       FlinkPipelineOptions options, List<String> filesToStage) {
+    return createStreamExecutionEnvironment(options, filesToStage, null);
+  }
+
+  @VisibleForTesting
+  static StreamExecutionEnvironment createStreamExecutionEnvironment(
+      FlinkPipelineOptions options, List<String> filesToStage, @Nullable String flinkConfigDir)
{
 
     LOG.info("Creating a Streaming Environment.");
 
@@ -119,13 +144,13 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
       flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     }
 
-    // set the correct parallelism.
-    if (options.getParallelism() != -1) {
-      flinkStreamEnv.setParallelism(options.getParallelism());
-    }
-
+    // Set the parallelism, required by UnboundedSourceWrapper to generate consistent splits.
+    final int parallelism =
+        determineParallelism(
+            options.getParallelism(), flinkStreamEnv.getParallelism(), flinkConfigDir);
+    flinkStreamEnv.setParallelism(parallelism);
     // set parallelism in the options (required by some execution code)
-    options.setParallelism(flinkStreamEnv.getParallelism());
+    options.setParallelism(parallelism);
 
     if (options.getObjectReuse()) {
       flinkStreamEnv.getConfig().enableObjectReuse();
@@ -156,9 +181,11 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
         throw new IllegalArgumentException("The checkpoint interval must be positive");
       }
       flinkStreamEnv.enableCheckpointing(checkpointInterval, options.getCheckpointingMode());
-      flinkStreamEnv
-          .getCheckpointConfig()
-          .setCheckpointTimeout(options.getCheckpointTimeoutMillis());
+      if (options.getCheckpointTimeoutMillis() != -1) {
+        flinkStreamEnv
+            .getCheckpointConfig()
+            .setCheckpointTimeout(options.getCheckpointTimeoutMillis());
+      }
       boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled();
       boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation();
       if (externalizedCheckpoint) {
@@ -189,6 +216,35 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
     return flinkStreamEnv;
   }
 
+  private static int determineParallelism(
+      final int pipelineOptionsParallelism,
+      final int envParallelism,
+      @Nullable String flinkConfDir) {
+    if (pipelineOptionsParallelism > 0) {
+      return pipelineOptionsParallelism;
+    }
+    if (envParallelism > 0) {
+      // If the user supplies a parallelism on the command-line, this is set on the execution
environment during creation
+      return envParallelism;
+    }
+
+    final Configuration configuration;
+    if (flinkConfDir == null) {
+      configuration = GlobalConfiguration.loadConfiguration();
+    } else {
+      configuration = GlobalConfiguration.loadConfiguration(flinkConfDir);
+    }
+    final int flinkConfigParallelism =
+        configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM.key(), -1);
+    if (flinkConfigParallelism > 0) {
+      return flinkConfigParallelism;
+    }
+    LOG.warn(
+        "No default parallelism could be found. Defaulting to parallelism 1. "
+            + "Please set an explicit parallelism with --parallelism");
+    return 1;
+  }
+
   private static void applyLatencyTrackingInterval(
       ExecutionConfig config, FlinkPipelineOptions options) {
     long latencyTrackingInterval = options.getLatencyTrackingInterval();
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 5989a6675b4..d004a67588f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.flink;
 
-import static com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -38,7 +37,7 @@
 
   public static FlinkJobInvoker create(
       ListeningExecutorService executorService, String flinkMasterUrl) {
-    return new FlinkJobInvoker(executorService, firstNonNull(flinkMasterUrl, "[auto]"));
+    return new FlinkJobInvoker(executorService, flinkMasterUrl);
   }
 
   private final ListeningExecutorService executorService;
@@ -62,7 +61,7 @@ public JobInvocation invoke(
         String.format("%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
     LOG.info("Invoking job {}", invocationId);
 
-    flinkOptions.setFlinkMaster(firstNonNull(flinkOptions.getFlinkMaster(), flinkMasterUrl));
+    flinkOptions.setFlinkMaster(flinkMasterUrl);
 
     flinkOptions.setRunner(null);
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 0db01448c4d..81b0e41bb5c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -56,12 +56,15 @@
       "Address of the Flink Master where the Pipeline should be executed. Can"
           + " either be of the form \"host:port\" or one of the special values [local], "
           + "[collection] or [auto].")
+  @Default.String("[auto]")
   String getFlinkMaster();
 
   void setFlinkMaster(String value);
 
-  @Description("The degree of parallelism to be used when distributing operations onto workers.")
-  @Default.InstanceFactory(DefaultParallelismFactory.class)
+  @Description(
+      "The degree of parallelism to be used when distributing operations onto workers. "
+          + "If the parallelism is not set, the configured Flink default is used, or 1 if
none can be found.")
+  @Default.Integer(-1)
   Integer getParallelism();
 
   void setParallelism(Integer value);
@@ -75,13 +78,13 @@
   void setCheckpointingInterval(Long interval);
 
   @Description("The checkpointing mode that defines consistency guarantee.")
-  @Default.Enum("AT_LEAST_ONCE")
+  @Default.Enum("EXACTLY_ONCE")
   CheckpointingMode getCheckpointingMode();
 
   void setCheckpointingMode(CheckpointingMode mode);
 
   @Description("The maximum time that a checkpoint may take before being discarded.")
-  @Default.Long(20 * 60 * 1000)
+  @Default.Long(-1L)
   Long getCheckpointTimeoutMillis();
 
   void setCheckpointTimeoutMillis(Long checkpointTimeoutMillis);
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 34f6479bbcb..8af87b23430 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -83,11 +83,6 @@ public static FlinkRunner fromOptions(PipelineOptions options) {
       LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
     }
 
-    // Set Flink Master to [auto] if no option was specified.
-    if (flinkOptions.getFlinkMaster() == null) {
-      flinkOptions.setFlinkMaster("[auto]");
-    }
-
     return new FlinkRunner(flinkOptions);
   }
 
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
new file mode 100644
index 00000000000..2e4c06cdfec
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Tests for {@link FlinkExecutionEnvironments}. */
+public class FlinkExecutionEnvironmentsTest {
+
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void shouldSetParallelismBatch() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setParallelism(42);
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(42));
+    assertThat(bev.getParallelism(), is(42));
+  }
+
+  @Test
+  public void shouldSetParallelismStreaming() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setParallelism(42);
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(42));
+    assertThat(sev.getParallelism(), is(42));
+  }
+
+  @Test
+  public void shouldInferParallelismFromEnvironmentBatch() throws IOException {
+    String flinkConfDir = extractFlinkConfig();
+
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList(), flinkConfDir);
+
+    assertThat(options.getParallelism(), is(23));
+    assertThat(bev.getParallelism(), is(23));
+  }
+
+  @Test
+  public void shouldInferParallelismFromEnvironmentStreaming() throws IOException {
+    String flinkConfDir = extractFlinkConfig();
+
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList(), flinkConfDir);
+
+    assertThat(options.getParallelism(), is(23));
+    assertThat(sev.getParallelism(), is(23));
+  }
+
+  @Test
+  public void shouldFallbackToDefaultParallelismBatch() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(1));
+    assertThat(bev.getParallelism(), is(1));
+  }
+
+  @Test
+  public void shouldFallbackToDefaultParallelismStreaming() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster("host:80");
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(1));
+    assertThat(sev.getParallelism(), is(1));
+  }
+
+  @Test
+  public void useDefaultParallelismFromContextBatch() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+
+    ExecutionEnvironment bev =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+    assertThat(bev.getParallelism(), is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+  }
+
+  @Test
+  public void useDefaultParallelismFromContextStreaming() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+
+    StreamExecutionEnvironment sev =
+        FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+            options, Collections.emptyList());
+
+    assertThat(options.getParallelism(), is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+    assertThat(sev.getParallelism(), is(LocalStreamEnvironment.getDefaultLocalParallelism()));
+  }
+
+  private String extractFlinkConfig() throws IOException {
+    InputStream inputStream = getClass().getResourceAsStream("/flink-conf.yaml");
+    File root = temporaryFolder.getRoot();
+    Files.copy(inputStream, new File(root, "flink-conf.yaml").toPath());
+    return root.getAbsolutePath();
+  }
+}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 2edf3c73d1e..49cdbe83f93 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.beam.runners.flink;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.nullValue;
+
 import java.util.Collections;
 import java.util.HashMap;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -36,6 +40,7 @@
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.joda.time.Instant;
@@ -59,6 +64,28 @@
   private static MyOptions options =
       PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class);
 
+  /** These defaults should only be changed with a very good reason. */
+  @Test
+  public void testDefaults() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    assertThat(options.getParallelism(), is(-1));
+    assertThat(options.getFlinkMaster(), is("[auto]"));
+    assertThat(options.getFilesToStage(), is(nullValue()));
+    assertThat(options.getLatencyTrackingInterval(), is(0L));
+    assertThat(options.isShutdownSourcesOnFinalWatermark(), is(false));
+    assertThat(options.getObjectReuse(), is(false));
+    assertThat(options.getCheckpointingMode(), is(CheckpointingMode.EXACTLY_ONCE));
+    assertThat(options.getMinPauseBetweenCheckpoints(), is(-1L));
+    assertThat(options.getCheckpointingInterval(), is(-1L));
+    assertThat(options.getCheckpointTimeoutMillis(), is(-1L));
+    assertThat(options.getNumberOfExecutionRetries(), is(-1));
+    assertThat(options.getExecutionRetryDelay(), is(-1L));
+    assertThat(options.getRetainExternalizedCheckpointsOnCancellation(), is(false));
+    assertThat(options.getStateBackend(), is(nullValue()));
+    assertThat(options.getMaxBundleSize(), is(1000L));
+    assertThat(options.getMaxBundleTimeMills(), is(1000L));
+  }
+
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
     TupleTag<String> mainTag = new TupleTag<>("main-output");
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 66600193f0b..17d803d193a 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -21,6 +21,7 @@
 import java.io.File;
 import java.io.Serializable;
 import java.util.Arrays;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.FlinkTestPipeline;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
@@ -85,6 +86,7 @@ public void processElement(ProcessContext c) {
   public void testProgram() throws Exception {
 
     Pipeline p = FlinkTestPipeline.createForStreaming();
+    p.getOptions().as(FlinkPipelineOptions.class).setParallelism(1);
 
     PCollection<String> output =
         p.apply(
diff --git a/runners/flink/src/test/resources/flink-conf.yaml b/runners/flink/src/test/resources/flink-conf.yaml
new file mode 100644
index 00000000000..54a03c99a7f
--- /dev/null
+++ b/runners/flink/src/test/resources/flink-conf.yaml
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+parallelism.default: 23


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 145731)
    Time Spent: 2h 50m  (was: 2h 40m)

> Issue with setting the parallelism at client level using Flink runner
> ---------------------------------------------------------------------
>
>                 Key: BEAM-3089
>                 URL: https://issues.apache.org/jira/browse/BEAM-3089
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.0.0
>         Environment: I am using Flink 1.2.1 running on Docker, with Task Managers distributed
across different VMs as part of a Docker Swarm.
>            Reporter: Thalita Vergilio
>            Assignee: Grzegorz KoĊ‚akowski
>            Priority: Major
>              Labels: docker, flink, parallel-deployment
>             Fix For: 2.8.0
>
>         Attachments: flink-ui-parallelism.png
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> When uploading an Apache Beam application using the Flink Web UI, the parallelism set
at job submission doesn't get picked up. The same happens when submitting a job using the
Flink CLI.
> In both cases, the parallelism ends up defaulting to 1.
> When I set the parallelism programmatically within the Apache Beam code, it works: {{flinkPipelineOptions.setParallelism(4);}}
> I suspect the root of the problem may be in the org.apache.beam.runners.flink.DefaultParallelismFactory
class, as it checks for Flink's GlobalConfiguration, which may not pick up runtime values
passed to Flink, then defaults to 1 if it doesn't find anything.
> Any ideas on how this could be fixed or worked around? I need to be able to change the
parallelism dynamically, so the programmatic approach won't really work for me, nor will setting
the Flink configuration at system level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message