flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maciek Próchniak <...@touk.pl>
Subject Re: Threads waiting on LocalBufferPool
Date Thu, 21 Apr 2016 14:41:36 GMT
Well...
I found some time to look at rocksDB performance.

It takes around 0.4ms to lookup value state and 0.12ms to update - these 
are means, 95th percentile was > 1ms for get... When I set additional 
options:
           .setIncreaseParallelism(8)
           .setMaxOpenFiles(-1)
           .setCompressionType(CompressionType.SNAPPY_COMPRESSION)

I manage to get
0.05ms for update and 0.2ms for get - but still it seems pretty bad 
compared to standard rocksdb java benchmarks that I try on the same 
machine, as they are:
fillseq          :     1.23238 micros/op;   89.8 MB/s; 1000000 ops 
done;  1 / 1 task(s) finished.
readrandom       :     9.25380 micros/op;   12.0 MB/s; 1000000 / 1000000 
found;  1 / 1 task(s) finished.
fillrandom       :     4.46839 micros/op;   24.8 MB/s; 1000000 ops 
done;  1 / 1 task(s) finished.

guess I'll have to look at it a bit more...

thanks anyway,
maciek


On 21/04/2016 08:41, Maciek Próchniak wrote:
> Hi Ufuk,
>
> thanks for quick reply.
> Actually I had a little time to try both things.
> 1) helped only temporarily - it just took a bit longer to saturate the 
> pool. After few minutes, periodically all kafka threads were waiting 
> for bufferPool.
> 2) This seemed to help. I also reduced checkpoint interval - on rocks 
> we had 5min, now I tried 30s. .
>
> I attach throughput metrics - the former (around 18) is with increased 
> heap & buffers, the latter (around 22) is with FileSystemStateBackend.
> My state is few GB large - during the test it reached around 2-3GB. I 
> must admit I was quite impressed that checkpointing to HDFS using 
> FileSystem took only about 6-7s (with occasional spikes to 12-13s, 
> which can be seen on metrcs - didn't check if it was caused by hdfs or 
> sth else).
>
> Now I looked at logs from 18 and seems like checkpointing rocksdb took 
> around 2-3minutes:
> 2016-04-20 17:47:33,439 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - 
> Triggering checkpoint 6 @ 1461167253439
> 2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147] 
> INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> Completed checkpoint 6 (in 140588 ms)
> - however I don't see any threads dumping state in threadStacks...
>
> I guess I'll have to add some metrics around state invocations to see 
> where is the problem with rocksDB... I'll write if I find anything, 
> but that won't be today I think...
>
> Btw - I was looking at FS state and I wonder would it be feasible to 
> make variant of this state using immutable map (probably some scala 
> one) to be able to do async checkpoints.
> Then synchronous part would be essentially free - just taking the 
> state map and materializing it asynchronously.
> Of course, that would work only for immutable state - but this is 
> often the case when writing in scala. WDYT?
>
> thanks,
> maciek
>
>
>
>
> On 20/04/2016 16:28, Ufuk Celebi wrote:
>> Could be different things actually, including the parts of the network
>> you mentioned.
>>
>> 1)
>>
>> Regarding the TM config:
>> - It can help to increase the number of network buffers (you can go
>> ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
>> - In general, you have way more memory available than you actually
>> give to Flink. I would increase the 20 GB heap size.
>>
>> As a first step you could address these two points and re-run your job.
>>
>> 2)
>>
>> As a follow-up you could also work with the FileSystemStateBackend,
>> which keeps state in memory (on-heap) and writes checkpoints to files.
>> This would help in checking how much RocksDB is slowing things down.
>>
>>
>> I'm curious about the results. Do you think you will have time to try 
>> this?
>>
>> – Ufuk
>>
>>
>> On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <mpr@touk.pl> wrote:
>>> Hi,
>>> I'm running my flink job on one rather large machine (20 cores with
>>> hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
>>> It does more or less:
>>> read csv from kafka -> keyBy one of the fields -> some custom state
>>> processing.
>>> Kafka topic has 24 partitions, so my parallelism is also 24
>>>
>>> After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
>>> backend) I reached a point when throughput is ~120-150k/s.
>>> One the same kafka and machine I reached > 500k/s with simple 
>>> filtering job,
>>> so I wanted to see what's the bottleneck.
>>>
>>> It turns out that quite often all of kafka threads are stuck waiting 
>>> for
>>> buffer from pool:
>>> "Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000
>>> nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
>>>     java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>>          at java.lang.Object.wait(Native Method)
>>>          at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)

>>>
>>>          - locked <0x00000002eade3890> (a java.util.ArrayDeque)
>>>          at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)

>>>
>>>          at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)

>>>
>>>          - locked <0x00000002eb73cbd0> (a
>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)

>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

>>>
>>>          at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)

>>>
>>>          at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)

>>>
>>>          at scala.collection.immutable.List.foreach(List.scala:381)
>>>          at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)

>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)

>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)

>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)

>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)

>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)

>>>
>>>          at
>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)

>>>
>>>          - locked <0x00000002eaf3eb50> (a java.lang.Object)
>>>          at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)

>>>
>>>          - locked <0x00000002eaf3eb50> (a java.lang.Object)
>>>
>>> This seems a bit weird for me, as most of state processing threads 
>>> are idle:
>>>
>>> "My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon 
>>> prio=5
>>> os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition
>>> [0x00007f7bee8ed000]
>>>     java.lang.Thread.State: TIMED_WAITING (parking)
>>>          at sun.misc.Unsafe.park(Native Method)
>>>          - parking to wait for  <0x00000002eb840c38> (a
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>          at
>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>>          at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)

>>>
>>>          at
>>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) 
>>>
>>>          at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)

>>>
>>>          at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)

>>>
>>>          at 
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>          at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> I tried with using more network buffers, but I doesn't seem to change
>>> anything - and if I understand correctly
>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers

>>>
>>> I should not need more than 24^2 * 4 of them...
>>>
>>> Does anybody encountered such problem? Or maybe it's just normal for 
>>> such
>>> case...
>>>
>>> thanks,
>>> maciek
>>>
>


Mime
View raw message