flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2373) Add configuration parameter to createRemoteEnvironment method
Date Wed, 02 Sep 2015 07:47:46 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726940#comment-14726940
] 

ASF GitHub Bot commented on FLINK-2373:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r38506169
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +    private static final int TM_SLOTS = 4;
    +
    +    private static final int NUM_TM = 1;
    +
    +    private static final int USER_DOP = 2;
    +
    +    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +    private static ForkableFlinkMiniCluster cluster;
    +
    +    @BeforeClass
    +    public static void setupCluster() {
    +        try {
    +            Configuration config = new Configuration();
    +            config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
    +            config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
    +            cluster = new ForkableFlinkMiniCluster(config, false);
    +            cluster.start();
    +        }
    +        catch (Exception e) {
    +            e.printStackTrace();
    +            fail("Error starting test cluster: " + e.getMessage());
    +        }
    +    }
    +
    +    @AfterClass
    +    public static void tearDownCluster() {
    +        try {
    +            cluster.stop();
    +        }
    +        catch (Throwable t) {
    +            t.printStackTrace();
    +            fail("Cluster shutdown caused an exception: " + t.getMessage());
    +        }
    +    }
    +
    +    /**
    +     * Ensure that that Akka configuration parameters can be set.
    +     */
    +    @Test(expected=IllegalArgumentException.class)
    +    public void testInvalidAkkaConfiguration() throws Throwable {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<String> result = env.createInput(new TestNonRichInputFormat());
    +        result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
    +        try {
    +            env.execute();
    --- End diff --
    
    Shouldn't we fail after this statement?


> Add configuration parameter to createRemoteEnvironment method
> -------------------------------------------------------------
>
>                 Key: FLINK-2373
>                 URL: https://issues.apache.org/jira/browse/FLINK-2373
>             Project: Flink
>          Issue Type: Bug
>          Components: other
>            Reporter: Andreas Kunft
>            Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently there is no way to provide a custom configuration upon creation of a remote
environment (via ExecutionEnvironment.createRemoteEnvironment(...)).
> This leads to errors when the submitted job exceeds the default value for the max. payload
size in Akka, as we can not increase the configuration value (akka.remote.OversizedPayloadException:
Discarding oversized payload...)
> Providing an overloaded method with a configuration parameter for the remote environment
fixes that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message