Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CB018200CDE for ; Tue, 8 Aug 2017 15:46:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C73A2167493; Tue, 8 Aug 2017 13:46:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BC848167492 for ; Tue, 8 Aug 2017 15:46:27 +0200 (CEST) Received: (qmail 63098 invoked by uid 500); 8 Aug 2017 13:46:26 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 63080 invoked by uid 99); 8 Aug 2017 13:46:26 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Aug 2017 13:46:26 +0000 Received: from aljoschas-mbp.fritz.box (ipservice-092-219-057-167.092.219.pools.vodafone-ip.de [92.219.57.167]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 952461A0048; Tue, 8 Aug 2017 13:46:24 +0000 (UTC) From: Aljoscha Krettek Message-Id: Content-Type: multipart/alternative; boundary="Apple-Mail=_14D379BE-03A7-4D01-9224-6C6B29546041" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: Flink REST API async? Date: Tue, 8 Aug 2017 15:46:22 +0200 In-Reply-To: <0459EB23-E64E-49A1-86C7-233B705AB31A@piksel.com> Cc: Eron Wright , "user@flink.apache.org" , Paul Wilson , Juan Hidalgo , Dave Clark , Till Rohrmann To: Francisco Gonzalez Barea References: <8F63DF6E-3101-4DEC-9CA4-C765F31CC80B@piksel.com> <0459EB23-E64E-49A1-86C7-233B705AB31A@piksel.com> X-Mailer: Apple Mail (2.3273) archived-at: Tue, 08 Aug 2017 13:46:29 -0000 --Apple-Mail=_14D379BE-03A7-4D01-9224-6C6B29546041 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 I quickly talked to Till about this. The new JobManager, once FLIP-6 is = implemented, will have a new REST endpoint that allows submitting a = JobGraph directly. With this, we no longer have to execute the user = main() method in the WebRuntimeMonitor (which is a component that the = current JobManager process loads to serve the web frontend and the REST = interface). This should solve the problem, but unfortunately it doesn't solve your = current problem. Best, Aljoscha > On 8. Aug 2017, at 10:26, Francisco Gonzalez Barea = wrote: >=20 > Aha ok=E2=80=A6 Thanks for your answer Eron. >=20 > Regards >=20 >=20 >> On 7 Aug 2017, at 19:04, Eron Wright > wrote: >>=20 >> When you submit a program via the REST API, the main method executes = inside the JobManager process. Unfortunately a static variable is = used to establish the execution environment that the program obtains = from `ExecutionEnvironment.getExecutionEnvironment()`. =46rom the stack = trace it appears that two main methods are executing simultaneously and = one is corrupting the other. >>=20 >> On Mon, Aug 7, 2017 at 8:21 AM, Francisco Gonzalez Barea = > = wrote: >> Hi there! >>=20 >> We are doing some POCs submitting jobs remotely to Flink. We tried = with Flink CLI and now we=C2=B4re testing the Rest API.=20 >>=20 >> So the point is that when we try to execute a set of requests in an = async way (using CompletableFutures) only a couple of them run = successfully. For the rest we get the exception copied at the end of the = email. >>=20 >> Do you know the reason for this? >>=20 >> Thanks in advance!! >> Regards, >>=20 >> org.apache.flink.client.program.ProgramInvocationException: The main = method caused an error. >> at = org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:545) >> at = org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForEx= ecution(PackagedProgram.java:419) >> at = org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(= OptimizerPlanEnvironment.java:80) >> at = org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClie= nt.java:318) >> at = org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphA= ndClassLoader(JarActionHandler.java:72) >> at = org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleJsonReque= st(JarRunHandler.java:61) >> at = org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler.ha= ndleRequest(AbstractJsonRequestHandler.java:41) >> at = org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(= RuntimeMonitorHandler.java:109) >> at = org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0= (RuntimeMonitorHandlerBase.java:97) >> at = org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0= (RuntimeMonitorHandlerBase.java:44) >> at = io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInbo= undHandler.java:105) >> at = io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractC= hannelHandlerContext.java:339) >> at = io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractCha= nnelHandlerContext.java:324) >> at = io.netty.handler.codec.http.router.Handler.routed(Handler.java:62) >> at = io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAb= stractHandler.java:57) >> at = io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAb= stractHandler.java:20) >> at = io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInbo= undHandler.java:105) >> at = io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractC= hannelHandlerContext.java:339) >> at = io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractCha= nnelHandlerContext.java:324) >> at = org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRe= questHandler.java:159) >> at = org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRe= questHandler.java:65) >> at = io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInbo= undHandler.java:105) >> at = io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractC= hannelHandlerContext.java:339) >> at = io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractCha= nnelHandlerContext.java:324) >> at = io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHa= ndlerAdapter.java:86) >> at = io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractC= hannelHandlerContext.java:339) >> at = io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractCha= nnelHandlerContext.java:324) >> at = io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecod= er.java:242) >> at = io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelD= uplexHandler.java:147) >> at = io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractC= hannelHandlerContext.java:339) >> at = io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractCha= nnelHandlerContext.java:324) >> at = io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipe= line.java:847) >> at = io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNio= ByteChannel.java:131) >> at = io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511= ) >> at = io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoo= p.java:468) >> at = io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:38= 2) >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> at = io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEvent= Executor.java:111) >> at = io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run= (DefaultThreadFactory.java:137) >> at java.lang.Thread.run(Thread.java:748) >> \nCaused by: java.util.ConcurrentModificationException >> at = java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) >> at java.util.ArrayList$Itr.next(ArrayList.java:851) >> at = org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(Op= eratorTranslation.java:49) >> at = org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(Execution= Environment.java:1065) >> at = org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(Execution= Environment.java:1032) >> at = org.apache.flink.client.program.OptimizerPlanEnvironment.execute(Optimizer= PlanEnvironment.java:47) >> at = com.piksel.sequoia.media.common.launcher.Launcher.main(Launcher.java:56) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at = sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:= 62) >> at = sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIm= pl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at = org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedPro= gram.java:528) >> ... 39 more\n=E2=80=9D} >>=20 >>=20 >> This message is private and confidential. If you have received this = message in error, please notify the sender or servicedesk@piksel.com = and remove it from your system. >>=20 >> Piksel Inc is a company registered in the United States, 2100 Powers = Ferry Road SE, Suite 400, Atlanta, GA 30339 >>=20 >>=20 >=20 --Apple-Mail=_14D379BE-03A7-4D01-9224-6C6B29546041 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 I quickly talked to Till about this. The new JobManager, once = FLIP-6 is implemented, will have a new REST endpoint that allows = submitting a JobGraph directly. With this, we no longer have to execute = the user main() method in the WebRuntimeMonitor (which is a component = that the current JobManager process loads to serve the web frontend and = the REST interface).

This should solve the problem, but unfortunately it doesn't = solve your current problem.

Best,
Aljoscha
On = 8. Aug 2017, at 10:26, Francisco Gonzalez Barea <Francisco.Gonzalez@piksel.com> wrote:

Aha ok=E2=80=A6 Thanks for your answer Eron.

Regards


On 7 Aug 2017, at 19:04, Eron Wright <eronwright@gmail.com> wrote:

When you submit a program via the REST API, = the main method executes inside the JobManager process.   =  Unfortunately a static variable is used to establish the execution = environment that the program obtains from = `ExecutionEnvironment.getExecutionEnvironment()`.  =46rom the stack trace it appears that two main methods are executing = simultaneously and one is corrupting the other.

On Mon, Aug 7, 2017 at 8:21 AM, Francisco = Gonzalez Barea <Francisco.Gonzalez@piksel.com> wrote:
Hi there!

We are doing some POCs submitting jobs remotely to = Flink. We tried with Flink CLI and now we=C2=B4re testing the Rest = API. 

So the point is that when we try to execute a set of = requests in an async way (using CompletableFutures) only a couple of = them run successfully. For the rest we get the exception copied at the = end of the email.

Do you know the reason for this?

Thanks in advance!!
Regards,

org.apache.flink.client.program.ProgramInvocationException: The main method caused an = error.
  at = org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
  at = org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
  at = org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
 at = org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:318)
 at = org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:72)
 at = org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleJsonRequest(JarRunHandler.java:61)
 at = org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler.handleRequest(AbstractJsonRequestHandler.java:41)
 at = org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:109)
 at = org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:97)
 at = org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
 at = io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at = io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at = io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at = io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
 at = io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
 at = io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
 at = io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at = io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at = io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at = org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:159)
 at = org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
 at = io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at = io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at = io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at = io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
 at = io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at = io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at = io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
 at = io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
 at = io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at = io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at = io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
 at = io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at = io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
=
 at = io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
 at = io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
 at = io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
 at = io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
 at = io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
 at = java.lang.Thread.run(Thread.java:748)
 \nCaused by: = java.util.ConcurrentModificationException
 at = java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
 at = java.util.ArrayList$Itr.next(ArrayList.java:851)
 at = org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49)
 at = org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1065)
 at = org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1032)
 at = org.apache.flink.client.program.OptimizerPlanEnvironment.execute(OptimizerPlanEnvironment.java:47)
 at = com.piksel.sequoia.media.common.launcher.Launcher.main(Launcher.java:56)
 at = sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at = sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at = sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at = java.lang.reflect.Method.invoke(Method.java:498)
 at = org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
 ... 39 = more\n=E2=80=9D}


This = message is private and confidential. If you have received this message = in error, please notify the sender or servicedesk@piksel.com and remove it from your = system.

Piksel= Inc is a company registered in the United States, 2100 Powers Ferry = Road SE, Suite 400, Atlanta, GA 30339




= --Apple-Mail=_14D379BE-03A7-4D01-9224-6C6B29546041--