Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5A70A200CD6 for ; Mon, 31 Jul 2017 22:21:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 579BA161A15; Mon, 31 Jul 2017 20:21:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2AF68161A13 for ; Mon, 31 Jul 2017 22:21:45 +0200 (CEST) Received: (qmail 15251 invoked by uid 500); 31 Jul 2017 20:21:44 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 15198 invoked by uid 99); 31 Jul 2017 20:21:44 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 31 Jul 2017 20:21:44 +0000 Received: from mail-pg0-f52.google.com (mail-pg0-f52.google.com [74.125.83.52]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id C93DE1A00E8 for ; Mon, 31 Jul 2017 20:21:43 +0000 (UTC) Received: by mail-pg0-f52.google.com with SMTP id 125so142624477pgi.3 for ; Mon, 31 Jul 2017 13:21:43 -0700 (PDT) X-Gm-Message-State: AIVw113NnN4yWFZXclBSG7bWtNPud/AnLAr5+kRY+SUPF7dhSTf9/8fr LbE3NfhZWrzjm8czqnCKQ2sR1DRmfw== X-Received: by 10.99.1.151 with SMTP id 145mr16547452pgb.402.1501532503330; Mon, 31 Jul 2017 13:21:43 -0700 (PDT) MIME-Version: 1.0 Received: by 10.100.151.142 with HTTP; Mon, 31 Jul 2017 13:21:27 -0700 (PDT) In-Reply-To: References: From: Stephan Ewen Date: Mon, 31 Jul 2017 22:21:27 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Flink CLI cannot submit job to Flink on Mesos To: Francisco Gonzalez Barea Cc: Till Rohrmann , "user@flink.apache.org" Content-Type: multipart/alternative; boundary="001a1146dd347ae1480555a2c7f9" archived-at: Mon, 31 Jul 2017 20:21:46 -0000 --001a1146dd347ae1480555a2c7f9 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Francisco! Can you drop the explicit address of the jobmanager? The client should pick up that address automatically from ZooKeeper as well (together with the HA leader session ID). Please check if you have the ZooKeeper HA config entries in the config used by the CLI. Stephan On Mon, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea < Francisco.Gonzalez@piksel.com> wrote: > Hi again, > > On the other hand, we are running the following flink CLI command: > > ./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port} > ${our-program-jar} ${our-program-params} > > Maybe is the command what we are using wrongly? > > Thank you > > On 28 Jul 2017, at 11:07, Till Rohrmann wrote: > > Hi Francisco, > > have you set the right high-availability configuration options in your > client configuration as described here [1]? If not, then Flink is not abl= e > to find the correct JobManager because it retrieves the address as well a= s > a fencing token (called leader session id) from the HA store (ZooKeeper). > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/setup/mesos.html#high-availability > > Cheers, > Till > > On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonzalez Barea < > Francisco.Gonzalez@piksel.com> wrote: > >> Hello, >> >> We=C2=B4re having lot of issues while trying to submit a job remotely us= ing >> the Flink CLI command line tool. We have tried different configurations = but >> in all of them we get errors from AKKA while trying to connect. I will t= ry >> to summarise the configurations we=C2=B4ve tried. >> >> - Flink 1.3.0 deployed within a docker container on a Mesos cluster >> (using Marathon) >> - This flink has the property jobmanager.rpc.address as a hostname (i.e. >> kind of ip-XXXXXXXXX.eu .west-1.comp >> ute.internal) >> - Use the same version for Flink Client remotely (e.g. in my laptop). >> >> When I try to submit the job using the command flink run -m >> myHostName:myPort (the same in jobmanager.rpc.address and >> jobmanager.rpc.port) after some time waiting I get the trace at the end = of >> this email. In the flink side we get this error from AKKA: >> >> Association with remote system [akka.tcp://flink@10.203.23.24:24469] has >> failed, address is now gated for [5000] ms. Reason: [Association failed >> with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection >> refused: /10.203.23.24:24469] >> >> After reading a bit, it seems there=C2=B4re some problems related to akk= a >> resolving hostnames to ips, so we decided to startup the same flink but >> changing jobmanager.rpc.address to have the direct ip (i.e. kind of >> XX.XXX.XX.XX). In this case I=C2=B4m getting same trace (at the end of t= he >> email) from the client side and this one from the Flink server: >> >> Discard message LeaderSessionMessage(00000000- >> 0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: >> b25d5c5ced962632abc5ee9ef867792e),DETACHED)) because the expected leader >> session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the >> received leader session ID 00000000-0000-0000-0000-000000000000. >> >> We have tried some other stuff but without success=E2=80=A6 any clue tha= t could >> help us? >> >> Thanks in advance! >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> execution failed: JobManager did not respond within 60000 milliseconds >> at org.apache.flink.client.program.ClusterClient.runDetached( >> ClusterClient.java:454) >> at org.apache.flink.client.program.StandaloneClusterClient.subm >> itJob(StandaloneClusterClient.java:99) >> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:400) >> at org.apache.flink.client.program.DetachedEnvironment.finalize >> Execute(DetachedEnvironment.java:76) >> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:345) >> at org.apache.flink.client.CliFrontend.executeProgram(CliFronte >> nd.java:831) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) >> at org.apache.flink.client.CliFrontend.parseParameters(CliFront >> end.java:1073) >> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) >> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) >> at org.apache.flink.runtime.security.HadoopSecurityContext$1. >> run(HadoopSecurityContext.java:43) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro >> upInformation.java:1548) >> at org.apache.flink.runtime.security.HadoopSecurityContext.runS >> ecured(HadoopSecurityContext.java:40) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) >> Caused by: org.apache.flink.runtime.client.JobTimeoutException: >> JobManager did not respond within 60000 milliseconds >> at org.apache.flink.runtime.client.JobClient.submitJobDetached( >> JobClient.java:426) >> at org.apache.flink.client.program.ClusterClient.runDetached( >> ClusterClient.java:451) >> ... 15 more >> Caused by: java.util.concurrent.TimeoutException: Futures timed out >> after [60000 milliseconds] >> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223= ) >> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) >> at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(B >> lockContext.scala:53) >> at scala.concurrent.Await$.result(package.scala:190) >> at scala.concurrent.Await.result(package.scala) >> at org.apache.flink.runtime.client.JobClient.submitJobDetached( >> JobClient.java:423) >> ... 16 more >> >> >> >> This message is private and confidential. If you have received this >> message in error, please notify the sender or servicedesk@piksel.com and >> remove it from your system. >> >> Piksel Inc is a company registered in the United States, 2100 Powers >> Ferry Road SE, Suite 400, Atlanta, GA 30339 >> > > > --001a1146dd347ae1480555a2c7f9 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Francisco!

Can you drop the explicit= address of the jobmanager? The client should pick up that address automati= cally from ZooKeeper as well (together with the HA leader session ID).

Please check if you have the ZooKeeper HA config entri= es in the config used by the CLI.

Stephan


On Mo= n, Jul 31, 2017 at 6:27 PM, Francisco Gonzalez Barea <= Francisc= o.Gonzalez@piksel.com> wrote:
Hi again,

On the other hand, we are running the following flink CLI command:

./flink run -d -m ${jobmanager.rpc.address}:${jobmanager.rpc.port= } =C2=A0${our-program-jar} ${our-program-params}

Maybe is the command what we are using wrongly?

Thank you

On 28 Jul 2017, at 11:07, Till Rohrmann <trohrmann@apache.org> wrote:

Hi Francisco,

have you set the right high-availability configuration options in your= client configuration as described here [1]? If not, then Flink is not able= to find the correct JobManager because it retrieves the address as well as= a fencing token (called leader session id) from the HA store (ZooKeeper).


Cheers,
Till

On Thu, Jul 27, 2017 at 6:20 PM, Francisco Gonza= lez Barea <Francisco.Gonzalez@piksel.com> wrote:
Hello,

We=C2=B4re having lot of issues while trying to submit a job remotely = using the Flink CLI command line tool. We have tried different configuratio= ns but in all of them we get errors from AKKA while trying to connect. I wi= ll try to summarise the configurations we=C2=B4ve tried.

- Flink 1.3.0 deployed within a docker container on a Mesos cluster (u= sing Marathon)
- This flink has the property jobmanager.rpc.address as a hostname (i.= e. kind of=C2=A0ip-XX= XXXXXXX.eu.west-1.compute.internal)
- Use the same version for Flink Client remotely (e.g. in my laptop).<= /div>

When I try to submit the job using the command flink run -m myHostName= :myPort (the same in jobmanager.rpc.address and jobmanager.rpc.port) after = some time waiting I get the trace at the end of this email. In the flink si= de we get this error from AKKA:=C2=A0

Association with remote system [akka.tcp://f= link@10.203.23.24:24469] has failed, address is now gated for [500= 0] ms. Reason: [Association failed with [akka.tcp://flink@10.203.23.24:24469]] Caused by: [Connection refused: /10.203.23.24:24469]

After reading a bit, it seems there=C2=B4re some problems related to a= kka resolving hostnames to ips, so we decided to startup the same flink but= changing jobmanager.rpc.address to have the direct ip (i.e. kind of XX.XXX= .XX.XX). In this case I=C2=B4m getting same trace (at the end of the email) from the client side and this one fro= m the Flink server:

Discard message LeaderSessionMessage(00000000-<= wbr>0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: b25d5c5ced96= 2632abc5ee9ef867792e),DETACHED)) because the expected leader session ID b4f53899-5d70-467e-8e9d-e56eeb60b6e3 did not equal the rec= eived leader session ID 00000000-0000-0000-0000-000000000000.

We have tried some other stuff but without succ= ess=E2=80=A6 any clue that could help us?

Thanks in advance!

org.apache.flink.client.program.ProgramInv= ocationException: The program execution failed: JobManager did not respond = within 60000 milliseconds
at org.apac= he.flink.client.program.ClusterClient.runDetached(ClusterClient.j= ava:454)
at org.apac= he.flink.client.program.StandaloneClusterClient.submitJob(Standal= oneClusterClient.java:99)
at org.apac= he.flink.client.program.ClusterClient.run(ClusterClient.java:400)=
at org.apac= he.flink.client.program.DetachedEnvironment.finalizeExecute(Detac= hedEnvironment.java:76)
at org.apac= he.flink.client.program.ClusterClient.run(ClusterClient.java:345)=
at org.apac= he.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)<= /font>
at org.apac= he.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apac= he.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073= )
at org.apac= he.flink.client.CliFrontend$2.call(CliFrontend.java:1120)<= /div>
at org.apac= he.flink.client.CliFrontend$2.call(CliFrontend.java:1117)<= /div>
at org.apac= he.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecur= ityContext.java:43)
at java.sec= urity.AccessController.doPrivileged(Native Method)
at org.apac= he.hadoop.security.UserGroupInformation.doAs(UserGroupInformation= .java:1548)
at org.apac= he.flink.runtime.security.HadoopSecurityContext.runSecured(Hadoop= SecurityContext.java:40)
at org.apac= he.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: org.apache.flink.runtime.client= .JobTimeoutException: JobManager did not respond within 60000 milliseconds<= /font>
at org.apac= he.flink.runtime.client.JobClient.submitJobDetached(JobClient.jav= a:426)
at org.apac= he.flink.client.program.ClusterClient.runDetached(ClusterClient.j= ava:451)
... 15 more=
Caused by: java.util.concurrent.TimeoutExc= eption: Futures timed out after [60000 milliseconds]
at scala.co= ncurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.co= ncurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.co= ncurrent.Await$$anonfun$result$1.apply(package.scala:190)<= /div>
at scala.co= ncurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.s= cala:53)
at scala.co= ncurrent.Await$.result(package.scala:190)
at scala.co= ncurrent.Await.result(package.scala)
at org.apac= he.flink.runtime.client.JobClient.submitJobDetached(JobClient.jav= a:423)
... 16 more=



This message is private and c= onfidential. If you have received this message in error, please notify the = sender or servicedesk@pik= sel.com and remove it from your system.

Piksel Inc is a company regis= tered in the United States, 2100 Powers Ferry Road SE, Suite 400, Atlanta, = GA 30339




--001a1146dd347ae1480555a2c7f9--