flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@data-artisans.com>
Subject Re: How to submit flink job on yarn by java code
Date Thu, 16 Aug 2018 14:31:33 GMT
Hi,

Is this path accessible on the container? If not, use some distributed file system, nfs or
-yt —yarnship option of the cli.

Please also take a look at https://lists.apache.org/thread.html/%3CCAF=1nJ8GONoqux7czxpUxAf7L3p=-E_ePSTHk0uWa=GRyG=2nw@mail.gmail.com%3E
<https://lists.apache.org/thread.html/%3CCAF=1nJ8GONoqux7czxpUxAf7L3p=-E_ePSTHk0uWa=GRyG=2nw@mail.gmail.com%3E>

Piotrek

> On 16 Aug 2018, at 11:05, spoon_lz <971066723@qq.com> wrote:
> 
> Sorry, I don't know why the code and error are not visible.
> The error is :
> The program finished with the following exception:
> 
> /org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
> 	at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
> 	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
> 	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
> 	at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
> 	at flink.SubmitDemo.submit(SubmitDemo.java:75)
> 	at flink.SubmitDemo.main(SubmitDemo.java:50)
> Caused by:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment. 
> Diagnostics from YARN: Application application_1526888270443_0090 failed 2
> times due to AM Container for appattempt_1526888270443_0090_000002 exited
> with  exitCode: -1000
> For more detailed output, check application tracking
> page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
> click on links to logs of each attempt.
> Diagnostics: File
> file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> does not exist
> java.io.FileNotFoundException: File
> file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> does not exist
> 	at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
> 	at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
> 	at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
> 	at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
> 	at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
> 	at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
> 	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
> 	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> 	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
> 	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> Failing this attempt. Failing the application.
> If log aggregation is enabled on your cluster, use this command to further
> investigate the issue:
> yarn logs -applicationId application_1526888270443_0090
> 	at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
> 	at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
> 	at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
> 	... 5 more/
> 
> and my code like :
> 
> /public class SubmitDemo {
> 
> 
>    private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
>    private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
>    private static final String JAR_FILE =
> "/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";
> 
> 
>    public static void main(String[] args) {
> 
>        SubmitDemo demo = new SubmitDemo();
>        demo.before();
>        List<String> parameters = new ArrayList<>();
>        parameters.add("run");
>        parameters.add("-d");
>        parameters.add("-m");
>        parameters.add("yarn-cluster");
>        parameters.add("-ynm");
>        parameters.add("lz_test_alone");
>        parameters.add("-yn");
>        parameters.add("4");
>        parameters.add("-ytm");
>        parameters.add("4096");
>        parameters.add("-yjm");
>        parameters.add("1024");
>        parameters.add("-c");
>        parameters.add("flink.Demo");
>        parameters.add(JAR_FILE);
> 
>        try {
>            demo.submit(parameters.toArray(new String[parameters.size()]));
>        } catch (Exception e) {
>            e.printStackTrace();
>        }
>    }
> 
>    public void submit(String[] args) throws Exception {
> 
>        final String configurationDirectory = ENV_CONF;
> 
>        File configFIle = new File(FLINK_CONF);
> 
>        final Configuration flinkConfiguration =
> GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath());
> 
>        FlinkYarnSessionCli cli = new
> FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y",
> "yarn");
> 
>        final List<CustomCommandLine&lt;?>> customCommandLines =
> CliFrontend.loadCustomCommandLines(
>                flinkConfiguration,
>                configurationDirectory);
> 
>        CliFrontend testFrontend = new CliFrontend(flinkConfiguration,
> customCommandLines);
>        //submit
>        testFrontend.parseParameters(args);
>        CommandLine commandLine = CliFrontendParser.parse(
>                CliFrontendParser.getRunCommandOptions(),
>                args,
>                true);
>        final ApplicationId clusterId = cli.getClusterId(commandLine);
>        System.out.println("ApplicationId=" + clusterId.toString());
>    }
> 
>    // SET HADOOP ENV 
>    private void before() {
>        Map<String, String> newenv = Maps.newHashMap();
>        newenv.put("HADOOP_CONF_DIR", ENV_CONF);
>        newenv.put("YARN_CONF_DIR", ENV_CONF);
>           try {
>            Class<?> processEnvironmentClass =
> Class.forName("java.lang.ProcessEnvironment");
>            Field theEnvironmentField =
> processEnvironmentClass.getDeclaredField("theEnvironment");
>            theEnvironmentField.setAccessible(true);
>            Map<String, String> env = (Map<String, String>)
> theEnvironmentField.get(null);
>            env.putAll(newenv);
>            Field theCaseInsensitiveEnvironmentField =
> processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
>            theCaseInsensitiveEnvironmentField.setAccessible(true);
>            Map<String, String> cienv = (Map<String, String>)
> theCaseInsensitiveEnvironmentField.get(null);
>            cienv.putAll(newenv);
>      } catch (NoSuchFieldException e) {
>            Class[] classes = Collections.class.getDeclaredClasses();
>            Map<String, String> env = System.getenv();
>            for (Class cl : classes) {
>                if
> ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
>                    Field field = cl.getDeclaredField("m");
>                    field.setAccessible(true);
>                    Object obj = field.get(env);
>                    Map<String, String> map = (Map<String, String>) obj;
>                    map.clear();
>                    map.putAll(newenv);
>                }
>            }
>        }
>    }
> 
> 
> }/
> 
> 
> the error  is file not found
> "/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> "
> but I can foud this file .
> Previously, I thought it was an environment variable problem and added "
> before() ". This method still reported an error
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Mime
View raw message