flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shannon Carey <sca...@expedia.com>
Subject Re: API request to submit job takes over 1hr
Date Wed, 01 Jun 2016 23:44:55 GMT
It looks like the problem is due to the stack trace below.

Simply put, connection failure to Kafka when using the default settings causes job submission
to take over (flink.get-partitions.retry * tries by SimpleConsumer * socket.timeout.ms * #
of Kafka brokers) = (3 * 2 * 30 * (# of Kafka brokers)) seconds. In my case, since I have
36 Kafka brokers, it took over 108 minutes. This is beyond the maximum idle connection timeout
of an AWS ELB of 60 minutes, and beyond the normal length of time most people expect an HTTP
request to take. During these 108 minutes and after, aside from examining logs & stack
traces, it is not possible to determine what is happening with regard to the run job request.
It simply appears to hang and then fail, typically with a 504 Gateway Timeout status.

There are a couple problems that are responsible for this situation. Let me know if I should
move this discussion to the "devs" list: I am not a member there yet. I am happy to submit
JIRAs and I would be able to submit a Pull Request for the change to FlinkKafkaConsumer08
(and 09) initialization as suggested below.

  1.  JarRunHandler is provided with a timeout value, but that timeout value is ignored when
calling getJobGraphAndClassLoader(). This allows HTTP "run" requests to take arbitrary amounts
of time during which the status of the request and the job is unclear. Depending on the semantics
of the work that method does, perhaps it could be made asynchronous with a timeout?
  2.  FlinkKafkaConsumer08's constructor (as well as the Kafka 0.9 consumer's constructor)
performs network interaction & retries that can take a long time, and the constructor
is in the execution path beneath getJobGraphAndClassLoader() via the main() method of the
submitted Flink job. It is not necessary to do that work (retrieving Kafka partition info)
in the constructor. Instead, that work should occur when the job is asked to start, either
by overriding the AbstractRichFunction#open() method or by adding it to the top of the run()
method. Alternatively, though not any better, the signature of StreamExecutionEnvironment#addSource()
could be changed to take some kind of Factory<SourceFunction> so that instantiation
is deferred until necessary.

"nioEventLoopGroup-3-14" #41 prio=10 os_prio=0 tid=0x00007fd0e870b000 nid=0x167d runnable
[0x00007fd0cefcb000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
- locked <0x000000076a190060> (a java.lang.Object)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:110)
- locked <0x000000076a1900f0> (a java.lang.Object)
at kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
- locked <0x000000076a190180> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:55)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
- locked <0x000000076a190238> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:521)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:218)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:160)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept$.main(ProofOfConcept.scala:60)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept.main(ProofOfConcept.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

From: Shannon Carey <shannon.carey@orbitz.com<mailto:shannon.carey@orbitz.com>>
on behalf of Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>
Date: Wednesday, June 1, 2016 at 12:54 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: API request to submit job takes over 1hr

Hi folks,

I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test environment,
and everything is working fine. However,  I am unable to submit jobs to the prod cluster.

Uploading the JAR containing a Flink job succeeds. However, the request to run the job (UI
makes API request to /jars/<jarname>/run?<params>) takes so long to complete that
the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is the case even if the ELB timeout
is set to 1hr: the request returns 504 after 1hr. The request appears to fail server-side,
also, since no jobs have ever showed up in the UI as being in any status (successful/failed/completed
or otherwise). Shortly after the request is made, it is interesting to note that sometimes
(but not always), other requests by the UI to the API begin to take longer than usual, although
they do all eventually complete.

No interesting/suspicious log entries have been found. All YARN nodes appear healthy.

Does anyone have ideas about what the problem might be? Or ideas about troubleshooting steps
I should take?

Also, I was wondering if 1GB is a reasonable amount of memory to use for the Flink Job Manager?
It appears to be using only ~570MB but I am not sure if the Job Manager might be misbehaving
due to resource constraints. The prod cluster is currently composed of six c3.2xlarge EC2
instances. Task memory is set to 10496, Job Manager memory is set to 1024, and there are 8
slots set in the yarn-session.sh command. Are there any guidelines for memory allocation for
the Job Manager?

Thanks very much!
Shannon Carey
Mime
View raw message