flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@ververica.com>
Subject Re: Flink in EMR configuration problem
Date Wed, 01 Apr 2020 11:31:42 GMT
Hey,

Isn’t explanation of the problem in the logs that you posted? Not enough memory? You have
2 EMR nodes, 8GB memory each, while trying to allocate 2 TaskManagers AND 1 JobManager with
6GB heap size each?

Piotrek

> On 31 Mar 2020, at 17:01, Antonio Martínez Carratalá <amartinez@alto-analytics.com>
wrote:
> 
> Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code but I'm having
some problems
> 
> This is how I create the cluster:
> ------------------------------------------------------------------------------------------------------------
> StepConfig copyJarStep = new StepConfig()
>     .withName("copy-jar-step")
>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + "/lib/trendit-flink-jobs.jar
/home/hadoop/trendit-flink-jobs.jar"));
> 
> List<StepConfig> stepConfigs = new ArrayList<>();
> stepConfigs.add(copyJarStep);
> 
> Application flink = new Application().withName("Flink");
> 
> Configuration flinkConfiguration = new Configuration()
>     .withClassification("flink-conf")
>     .addPropertiesEntry("jobmanager.heap.size", "6g")
>     .addPropertiesEntry("taskmanager.heap.size", "6g")
>     .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2");
> 
> RunJobFlowRequest request = new RunJobFlowRequest()
>     .withName("cluster-" + executionKey)
>     .withReleaseLabel("emr-5.26.0")
>     .withApplications(flink)
>     .withConfigurations(flinkConfiguration)
>     .withServiceRole("EMR_DefaultRole")
>     .withJobFlowRole("EMR_EC2_DefaultRole")
>     .withLogUri(getWorkPath() + "logs")
>     .withInstances(new JobFlowInstancesConfig()
>         .withEc2SubnetId("mysubnetid")
>         .withInstanceCount(2)
>         .withKeepJobFlowAliveWhenNoSteps(true)
>         .withMasterInstanceType("m4.large")
>         .withSlaveInstanceType("m4.large"))
>     .withSteps(stepConfigs);
> 
> RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
> ---------------------------------------------------------------------------------------------------------
> 
> And this is how I add the jobwhen the cluster is ready:
> ------------------------------------------------------------------------------------------
> StepConfig runJobStep = new StepConfig()
>     .withName("run-job-step")
>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>         .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2 --class es.trendit.flink.job.centrality.CentralityJob
/home/hadoop/trendit-flink-jobs.jar <args...>"));
> 
> AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
> .withJobFlowId(clusterId)
> .withSteps(runJobStep);
> 
> AddJobFlowStepsResult result = amazonClient.getEmrClient().addJobFlowSteps(request);
> -----------------------------------------------------------------------------------------------
> 
> As summary:
> - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each)
> - jobmanager.heap.size and taskmanager.heap.size: 6g
> - taskmanager.numberOfTaskSlots: 2
> - run flink with --parallelism 2
> - so 1 EMR instance should be running the jobmanager and the other the taskmanager with
2 slots available
> 
> But it fails after some time and I see this warning in the step stdout file:
> ----------------------------------------------------------------------------------------------------------------------
> 2020-03-31 14:37:47,288 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor   
       - This YARN session requires 12288MB of memory in the cluster. There are currently
only 6144MB available.
> The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers
are connecting from the beginning because the resources are currently not available in the
cluster. The allocation might take more time than usual because the Flink YARN client needs
to wait until the resources become available.
> 2020-03-31 14:37:47,294 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor   
       - There is not enough memory available in the YARN cluster. The TaskManager(s) require
6144MB each. NodeManagers available: [6144]
> After allocating the JobManager (6144MB) and (0/1) TaskManagers, the following NodeManagers
are available: [0]
> The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers
are connecting from the beginning because the resources are currently not available in the
cluster. The allocation might take more time than usual because the Flink YARN client needs
to wait until the resources become available.
> 2020-03-31 14:37:47,296 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor   
       - Cluster specification: ClusterSpecification{masterMemoryMB=6144, taskManagerMemoryMB=6144,
numberTaskManagers=1, slotsPerTaskManager=2}
> ----------------------------------------------------------------------------------------------------------------------

> 
> And this error in the step stderr file:
> ----------------------------------------------------------------------------------------------------------------------

> org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1f0a651302d5fd48d35ff5b5d0880f99)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> ...
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 23 more
> Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate enough slots within timeout of 300000 ms to run the job. Please make sure
that the cluster has enough resources.
> at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> ...
> ----------------------------------------------------------------------------------------------------------------------
> 
> It looks to me like the TaskManager is not created at the beginning, any idea why is
this happening and how to solve it? I could not find any relevant information in Flink docs
> 
> Thanks
> 
> 


Mime
View raw message