flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maciek Próchniak <...@touk.pl>
Subject Threads waiting on LocalBufferPool
Date Wed, 20 Apr 2016 13:45:25 GMT
Hi,
I'm running my flink job on one rather large machine (20 cores with 
hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
It does more or less:
read csv from kafka -> keyBy one of the fields -> some custom state 
processing.
Kafka topic has 24 partitions, so my parallelism is also 24

After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state 
backend) I reached a point when throughput is ~120-150k/s.
One the same kafka and machine I reached > 500k/s with simple filtering 
job, so I wanted to see what's the bottleneck.

It turns out that quite often all of kafka threads are stuck waiting for 
buffer from pool:
"Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000 
nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
    java.lang.Thread.State: TIMED_WAITING (on object monitor)
         at java.lang.Object.wait(Native Method)
         at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
         - locked <0x00000002eade3890> (a java.util.ArrayDeque)
         at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
         at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
         - locked <0x00000002eb73cbd0> (a 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
         at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
         at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
         at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
         at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
         at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
         at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
         at scala.collection.immutable.List.foreach(List.scala:381)
         at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
         at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
         at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
         at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
         at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
         at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
         at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
         at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
         - locked <0x00000002eaf3eb50> (a java.lang.Object)
         at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
         - locked <0x00000002eaf3eb50> (a java.lang.Object)

This seems a bit weird for me, as most of state processing threads are idle:

"My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon prio=5 
os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition 
[0x00007f7bee8ed000]
    java.lang.Thread.State: TIMED_WAITING (parking)
         at sun.misc.Unsafe.park(Native Method)
         - parking to wait for  <0x00000002eb840c38> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
         at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
         at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
         at 
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
         at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)
         at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
         at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
         at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
         at java.lang.Thread.run(Thread.java:745)


I tried with using more network buffers, but I doesn't seem to change 
anything - and if I understand correctly 
https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers

I should not need more than 24^2 * 4 of them...

Does anybody encountered such problem? Or maybe it's just normal for 
such case...

thanks,
maciek


Mime
View raw message