flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Krugler <kkrugler_li...@transpac.com>
Subject Issue with back pressure and AsyncFunction
Date Fri, 10 Nov 2017 21:31:41 GMT
Hi all,

I was debugging a curious problem with a streaming job that contained an iteration and several
AsynFunctions.

The entire job would stall out, with no progress being made.

But when I checked back pressure, only one function showed it as being high - everything else
was OK.

And when I dumped threads, the only bit of my code that was running was indeed that one function
w/high back pressure, stuck while making a collect() call.

There were two issues here….

1. A downstream function in the iteration was (significantly) increasing the number of tuples
- it would get one in, and sometimes emit 100+.

The output would loop back as input via the iteration.

This eventually caused the network buffers to fill up, and that’s why the job got stuck.

I had to add my own tracking/throttling in one of my custom function, to avoid having too
many “active” tuples.

So maybe something to note in documentation on iterations, if it’s not there already.

2. The back pressure calculation doesn’t take into account AsyncIO

When I double-checked the thread dump, there were actually a number of threads (one for each
of my AsyncFunctions) that were stuck calling collect().

These all were named "AsyncIO-Emitter-Thread (<name of AsyncFunction>…). For example:

> "AsyncIO-Emitter-Thread (MyAsyncFunction -> (<blah>)) (1/1))" #125 daemon prio=5
os_prio=31 tid=0x00007fb191025800 nid=0xac0b in Object.wait() [0x00007000123f0000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> 	at java.lang.Object.wait(Native Method)
> 	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:224)
> 	- locked <0x0000000773cb3ec0> (a java.util.ArrayDeque)
> 	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:193)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
> 	- locked <0x0000000773b98020> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
> 	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:85)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:83)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:41)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> 	at org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:140)
> 	at org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:42)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> 	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:132)
> 	- locked <0x0000000773b1bb70> (a java.lang.Object)
> 	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
> 	at java.lang.Thread.run(Thread.java:748)



I’m assuming that when my AsyncFunction calls collect(), this hands off the tuple to this
AsyncIO-Emitter-Thread thread, which is why none of my code (either AsyncFunctions or threads
in my pool doing async stuff) shows up in the thread dump.

And I’m assuming that the back pressure calculation isn’t associating these threads with
the source function, which is why they don’t show up in the GUI.

I’m hoping someone can confirm the above. If so, I’ll file an issue.

Thanks,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Mime
View raw message