flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niels Basjes <Ni...@basjes.nl>
Subject Re: Running multiple jobs on yarn (without yarn-session)
Date Thu, 25 Aug 2016 15:14:55 GMT
I have this with a pretty recent version of the source version (not a
release).

Would be great if you see a way to fix this.
I consider it fine if this requires an extra call to the system indicating
that this is a 'mulitple job' situation.

I created https://issues.apache.org/jira/browse/FLINK-4495 for you

Niels Basjes

On Thu, Aug 25, 2016 at 3:34 PM, Maximilian Michels <mxm@apache.org> wrote:

> Hi Niels,
>
> This is with 1.1.1? We could fix this in the upcoming 1.1.2 release by
> only using automatic shut down for detached jobs. In all other cases
> we should be able to shutdown from the client side after running all
> jobs. The only downside I see is that Flink clusters may actually
> never be shutdown if the CLI somehow crashes or gets a network
> partition.
>
> Best,
> Max
>
> On Thu, Aug 25, 2016 at 12:04 PM, Niels Basjes <Niels@basjes.nl> wrote:
> > Hi,
> >
> > I created a small application that needs to run multiple (batch) jobs on
> > Yarn and then terminate.
> > In this case I'm exporting data from a list of HBase tables
> >
> > I essentially do right now the following:
> >
> > flink run -m yarn-cluster -yn 10  bla.jar ...
> >
> > And in my main I do
> >
> > foreach thing I need to do {
> >    ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> >    env. ... define the batch job.
> >    env.execute
> > }
> >
> > In the second job I submit I get an exception:
> > java.lang.RuntimeException: Unable to tell application master to stop
> once
> > the specified job has been finised
> > at
> > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(
> YarnClusterClient.java:184)
> > at
> > org.apache.flink.yarn.YarnClusterClient.submitJob(
> YarnClusterClient.java:202)
> > at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:389)
> > at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:376)
> > at
> > org.apache.flink.client.program.ContextEnvironment.
> execute(ContextEnvironment.java:61)
> > at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220)
> > at com.bol.tools.hbase.export.Main.run(Main.java:81)
> > at com.bol.tools.hbase.export.Main.main(Main.java:42)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:497)
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:509)
> > at
> > org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
> > at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:331)
> > at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:775)
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
> > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995)
> > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992)
> > at
> > org.apache.flink.runtime.security.SecurityUtils$1.run(
> SecurityUtils.java:56)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> > org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1548)
> > at
> > org.apache.flink.runtime.security.SecurityUtils.
> runSecured(SecurityUtils.java:53)
> > at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:992)
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
> > Caused by: java.util.concurrent.TimeoutException: Futures timed out
> after
> > [10000 milliseconds]
> > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> > at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.
> scala:223)
> > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> > at
> > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(
> BlockContext.scala:53)
> > at scala.concurrent.Await$.result(package.scala:107)
> > at scala.concurrent.Await.result(package.scala)
> > at
> > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(
> YarnClusterClient.java:182)
> > ... 25 more
> >
> >
> > How do I (without using yarn-session) tell the YarnClusterClient to
> simply
> > 'keep running because there will be more jobs'?
> >
> > If I run this same code in a yarn-session it works but then I have the
> > troubles of starting a (detached yarn-session) AND to terminate that
> thing
> > again after my jobs have run.
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>



-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Mime
View raw message