From user-return-33844-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Apr 1 11:31:52 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id B59E718064E for ; Wed, 1 Apr 2020 13:31:51 +0200 (CEST) Received: (qmail 87786 invoked by uid 500); 1 Apr 2020 11:31:50 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 87776 invoked by uid 99); 1 Apr 2020 11:31:50 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Apr 2020 11:31:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 735BBC1C04 for ; Wed, 1 Apr 2020 11:31:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.051 X-Spam-Level: * X-Spam-Status: No, score=1.051 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.25, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=ververica-com.20150623.gappssmtp.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id XAhj0Mkwjs5I for ; Wed, 1 Apr 2020 11:31:45 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::342; helo=mail-wm1-x342.google.com; envelope-from=piotr@data-artisans.com; receiver= Received: from mail-wm1-x342.google.com (mail-wm1-x342.google.com [IPv6:2a00:1450:4864:20::342]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 375387DC60 for ; Wed, 1 Apr 2020 11:31:45 +0000 (UTC) Received: by mail-wm1-x342.google.com with SMTP id t8so2797200wmi.2 for ; Wed, 01 Apr 2020 04:31:45 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=ververica-com.20150623.gappssmtp.com; s=20150623; h=mime-version:subject:from:in-reply-to:date:cc :content-transfer-encoding:message-id:references:to; bh=1WtIB7srvyDFij8OKU98iuZhhpKxH8RnIBtZ4BVmVHw=; b=kvOL4vNDFsH+yPmloF49vrVz7nfiRftoOQXzzQOSXfWZx8gOFEXPLa0oLYQ/2iMeej EuqUUcoww5sifVccuQcisDRwytMc4CJ0HJnsPr3xZpdXEHPLfLM6iquC8UWRBDROcIls 3UcG9Ign+rrsJC7gWkILFeH6r8EIQTkr1WwLe32yN42lRx4RdrV/K1Kenvg3yI+41PfN j5Cth8mqoYTL/MXxvkaoVXrE0LWvaWOQOloTmLS/0hC/7EDNZMjqvSX5PWfzMlUKjuzo 4WabvF0ozKnm2asLfZVfI3dWg8BWTdLSb9NXj2OA9oXA5q2XEW++Wk16CjtbX7jr7X55 rjYQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:subject:from:in-reply-to:date:cc :content-transfer-encoding:message-id:references:to; bh=1WtIB7srvyDFij8OKU98iuZhhpKxH8RnIBtZ4BVmVHw=; b=He4nRmdXni+DG4/cZP8YiZpNQ/bVlGgqK4XVG1QI57/ZxC47KG2km44O6JbvUWZXqP 2b3gRzbkr7K2VCt4tFlcfKXmYgDtovV28aHM/d1uhT3KD4r8b2Hz6wDX4hARUPh8qQac byPEY0eBQJ5/a7bSRKkspy7gEiePWWfmYEZDLne61tIlgnWvHtXhiI/jFp273yfx/JZw evdxaKsI2QzOH5TaWlL+v9YiClrxZxXu6wmrM8mRDuitmpeuwg0H2bD5ti3JTj2AqZ4d BOjw5FY2EHgIlzhKE2LGlXHBd6we99hO1unPkuOxxgNO0LujxnQ1KYW0hPDKQH2nX/HE SNVw== X-Gm-Message-State: AGi0PuY+fjo5LLn6WU3yLE2o0YEV75LEeFHDAix8QKt4+S4C+WUbSIWv 0sX1mu76iH/9FPsS7IWQdKcXI/xfiQU= X-Google-Smtp-Source: APiQypKFOdOS9aAp6mRrI0smbvg0EYIQxUXfm3eZv3N/JocZHirh6bacoGr8tHikbKH3DWjcGQJHVA== X-Received: by 2002:a1c:b789:: with SMTP id h131mr3644766wmf.141.1585740704645; Wed, 01 Apr 2020 04:31:44 -0700 (PDT) Received: from piotrs-mbp.fritz.box ([79.140.122.21]) by smtp.gmail.com with ESMTPSA id y11sm2165967wmi.13.2020.04.01.04.31.43 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Wed, 01 Apr 2020 04:31:43 -0700 (PDT) Content-Type: text/plain; charset=utf-8 Mime-Version: 1.0 (Mac OS X Mail 12.4 \(3445.104.11\)) Subject: Re: Flink in EMR configuration problem From: Piotr Nowojski In-Reply-To: Date: Wed, 1 Apr 2020 13:31:42 +0200 Cc: user Content-Transfer-Encoding: quoted-printable Message-Id: <9CA0D644-5C13-41F6-A700-4D96478553A3@ververica.com> References: To: =?utf-8?Q?Antonio_Mart=C3=ADnez_Carratal=C3=A1?= X-Mailer: Apple Mail (2.3445.104.11) Hey, Isn=E2=80=99t explanation of the problem in the logs that you posted? = Not enough memory? You have 2 EMR nodes, 8GB memory each, while trying = to allocate 2 TaskManagers AND 1 JobManager with 6GB heap size each? Piotrek > On 31 Mar 2020, at 17:01, Antonio Mart=C3=ADnez Carratal=C3=A1 = wrote: >=20 > Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java = code but I'm having some problems >=20 > This is how I create the cluster: > = --------------------------------------------------------------------------= ---------------------------------- > StepConfig copyJarStep =3D new StepConfig() > .withName("copy-jar-step") > .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) > .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") > .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + = "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar")); >=20 > List stepConfigs =3D new ArrayList<>(); > stepConfigs.add(copyJarStep); >=20 > Application flink =3D new Application().withName("Flink"); >=20 > Configuration flinkConfiguration =3D new Configuration() > .withClassification("flink-conf") > .addPropertiesEntry("jobmanager.heap.size", "6g") > .addPropertiesEntry("taskmanager.heap.size", "6g") > .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2"); >=20 > RunJobFlowRequest request =3D new RunJobFlowRequest() > .withName("cluster-" + executionKey) > .withReleaseLabel("emr-5.26.0") > .withApplications(flink) > .withConfigurations(flinkConfiguration) > .withServiceRole("EMR_DefaultRole") > .withJobFlowRole("EMR_EC2_DefaultRole") > .withLogUri(getWorkPath() + "logs") > .withInstances(new JobFlowInstancesConfig() > .withEc2SubnetId("mysubnetid") > .withInstanceCount(2) > .withKeepJobFlowAliveWhenNoSteps(true) > .withMasterInstanceType("m4.large") > .withSlaveInstanceType("m4.large")) > .withSteps(stepConfigs); >=20 > RunJobFlowResult result =3D = amazonClient.getEmrClient().runJobFlow(request); > = --------------------------------------------------------------------------= ------------------------------- >=20 > And this is how I add the jobwhen the cluster is ready: > = --------------------------------------------------------------------------= ---------------- > StepConfig runJobStep =3D new StepConfig() > .withName("run-job-step") > .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) > .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") > .withArgs("bash", "-c", "flink run -m yarn-cluster = --parallelism 2 --class es.trendit.flink.job.centrality.CentralityJob = /home/hadoop/trendit-flink-jobs.jar ")); >=20 > AddJobFlowStepsRequest request =3D new AddJobFlowStepsRequest() > .withJobFlowId(clusterId) > .withSteps(runJobStep); >=20 > AddJobFlowStepsResult result =3D = amazonClient.getEmrClient().addJobFlowSteps(request); > = --------------------------------------------------------------------------= --------------------- >=20 > As summary: > - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each) > - jobmanager.heap.size and taskmanager.heap.size: 6g > - taskmanager.numberOfTaskSlots: 2 > - run flink with --parallelism 2 > - so 1 EMR instance should be running the jobmanager and the other the = taskmanager with 2 slots available >=20 > But it fails after some time and I see this warning in the step stdout = file: > = --------------------------------------------------------------------------= -------------------------------------------- > 2020-03-31 14:37:47,288 WARN = org.apache.flink.yarn.AbstractYarnClusterDescriptor - This = YARN session requires 12288MB of memory in the cluster. There are = currently only 6144MB available. > The Flink YARN client will try to allocate the YARN session, but maybe = not all TaskManagers are connecting from the beginning because the = resources are currently not available in the cluster. The allocation = might take more time than usual because the Flink YARN client needs to = wait until the resources become available. > 2020-03-31 14:37:47,294 WARN = org.apache.flink.yarn.AbstractYarnClusterDescriptor - There is = not enough memory available in the YARN cluster. The TaskManager(s) = require 6144MB each. NodeManagers available: [6144] > After allocating the JobManager (6144MB) and (0/1) TaskManagers, the = following NodeManagers are available: [0] > The Flink YARN client will try to allocate the YARN session, but maybe = not all TaskManagers are connecting from the beginning because the = resources are currently not available in the cluster. The allocation = might take more time than usual because the Flink YARN client needs to = wait until the resources become available. > 2020-03-31 14:37:47,296 INFO = org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster = specification: ClusterSpecification{masterMemoryMB=3D6144, = taskManagerMemoryMB=3D6144, numberTaskManagers=3D1, = slotsPerTaskManager=3D2} > = --------------------------------------------------------------------------= --------------------------------------------=20 >=20 > And this error in the step stderr file: > = --------------------------------------------------------------------------= --------------------------------------------=20 > org.apache.flink.client.program.ProgramInvocationException: Job = failed. (JobID: 1f0a651302d5fd48d35ff5b5d0880f99) > at = org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClust= erClient.java:268) > at = org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > ... > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job = execution failed. > at = org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResul= t.java:146) > at = org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClust= erClient.java:265) > ... 23 more > Caused by: = org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException= : Could not allocate enough slots within timeout of 300000 ms to run the = job. Please make sure that the cluster has enough resources. > at = org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecut= ion$0(Execution.java:449) > at = java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.j= ava:774) > ... > = --------------------------------------------------------------------------= -------------------------------------------- >=20 > It looks to me like the TaskManager is not created at the beginning, = any idea why is this happening and how to solve it? I could not find any = relevant information in Flink docs >=20 > Thanks >=20 >=20