flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Liu, Gavin (CAI - Atlanta)" <Gavin....@coxautoinc.com>
Subject Re: Backpressure from producer with flink connector kinesis 1.4.2
Date Thu, 21 Jun 2018 03:55:47 GMT
Hi guys,

I have another question related to the KPL problem. I wonder what the consequences of overwhelming
KPL internal queue (kinesis) can be.

From my observation in experimenting with 1.4.2 (which does not have backpressure support
yet in the open pr stated below), when the flink cluster is processing too fast and the throughput
on the sink kinesis stream is limited, i.e, the throughput exceeding exception starts to be
thrown, we quite often get the following exception (pasted in the end) very soon and all the
subtasks switching status to cancelling and restarted.
From the exception trace, I can see that yarn got shutdown and all task managers are terminated.
I suspect it is because of the memory issue. Whenever the throughput exceeding exception is
thrown, it implicitly means that the internal unbounded queue in KPL may grow rapidly. we
set the recordTtl = 60s and we can still see the record expiration exception along with exceeded
throughput exception. Which leads me to wonder that if the internal unbounded queue grows
too large and exhaust all the memory in the node and eventually crashing the yarn and the
job manager.

Well, This is just my hypothesis. I wonder if someone has already encountered or investigated
similar issues and could shed some light on it.




java.lang.Exception: TaskManager was lost/killed: container_1529095945616_0009_01_000004 @
ip-172-31-64-249.ec2.internal (dataPort=44591)
    at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
    at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
    at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
    at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
    at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:426)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
    at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)







From: "Liu, Gavin (CAI - Atlanta)" <Gavin.Liu@coxautoinc.com>
Date: Wednesday, June 20, 2018 at 12:11 PM
To: "Tzu-Li (Gordon) Tai" <tzulitai@apache.org>, "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

Thanks, Gordon. You are quick and It is very helpful to me.
I tried some other alternatives to resolve this, finally thought about rewriting the FlinkKinesisProducer
class for our need. Glad that I asked before I started.
Really appreciate the quick response.

From: "Tzu-Li (Gordon) Tai" <tzulitai@apache.org>
Date: Wednesday, June 20, 2018 at 12:05 PM
To: "Liu, Gavin (CAI - Atlanta)" <Gavin.Liu@coxautoinc.com>, "user@flink.apache.org"
<user@flink.apache.org>
Subject: Re: Backpressure from producer with flink connector kinesis 1.4.2

Hi Gavin,

The problem is that the Kinesis producer currently does not propagate backpressure properly.
Records are added to the internally used KPL client’s queue, without any queue size limit.

This is considered a bug, and already has a pull request for it [1], which we should probably
push towards being merged soon.
What the pull request essentially does, is adding an upper bound to the number pending records
in the KPL producer queue.
Once the upper bound is hit, input to the Kinesis producer sink is blocked, and therefore
propagating backpressure further upstream.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/6021



On 20 June 2018 at 6:00:30 PM, Liu, Gavin (CAI - Atlanta) (gavin.liu@coxautoinc.com<mailto:gavin.liu@coxautoinc.com>)
wrote:
Hi guys,

I am new to flink framework. And we are building an application that takes kinesis stream
for both flink source and sink.
The flink version we are using is 1.4.2, which is also the version for the flink-connector-kinesis.
We built the flink-connector-kinesis jar explicitly with KPL version 0.12.6 due to the existing
problems with default 0.12.5.

I get a rough idea how the backpressure works with flink through reading http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CF8DD76C0-9DE0-412A-8C24-B72AF0D4211B@data-artisans.com%3E

From my experiment with flink and flink-connector-kinesis, the back pressure only happens
within flink processing operations, i.e., not in the flink producer to kinesis.
More specifically, when the throughput from KPL exceeds the kinesis throughput limitations,
flink does not slow down at all, i.e., it does not add pressure on the processing chain up
to the flink consumer.
Correct me if I misunderstood this. It looks like the flink producer (in the flink-connector-kinesis)
is a standalone component, once a record is collected and sent to the producer, flink core
finishes all the processing and does not care the fate of the record any more, it is the responsibility
of the connector to continue the job.
I am expecting back pressure to happen from the source kinesis stream to the sink kinesis
stream, whenever the sink kinesis stream could not handle the volume, it adds back pressure.
Could someone illustrate a bit more why flink connector is designed in such a way. Also correct
me if I stated anything wrong.


Gavin Liu

Mime
View raw message