flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maciek Próchniak <...@touk.pl>
Subject Re: Threads waiting on LocalBufferPool
Date Fri, 22 Apr 2016 15:01:37 GMT


On 21/04/2016 16:46, Aljoscha Krettek wrote:
> Hi,
> I would be very happy about improvements to our RocksDB performance. 
> What are the RocksDB Java benchmarks that you are running? In Flink, 
> we also have to serialize/deserialize every time that we access 
> RocksDB using our TypeSerializer. Maybe this is causing the slow down.
>
Hi Aljoscha,

I'm using benchmark from:
https://github.com/facebook/rocksdb/blob/master/java/jdb_bench.sh

My value is pretty simple scala case class - around 12 fields with 
Int/Long/String values - I think serialization shouldn't be a big 
problem. However I think I'll have to do more comprehensive tests to be 
sure I'm comparing apples to apples - hope to find time during weekend 
for that :)

thanks,
maciek

> By the way, what is the type of value stored in the RocksDB state. 
> Maybe the TypeSerializer for that value is very slow.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 16:41 Maciek Próchniak <mpr@touk.pl 
> <mailto:mpr@touk.pl>> wrote:
>
>     Well...
>     I found some time to look at rocksDB performance.
>
>     It takes around 0.4ms to lookup value state and 0.12ms to update -
>     these are means, 95th percentile was > 1ms for get... When I set
>     additional options:
>               .setIncreaseParallelism(8)
>               .setMaxOpenFiles(-1)
>     .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>
>     I manage to get
>     0.05ms for update and 0.2ms for get - but still it seems pretty
>     bad compared to standard rocksdb java benchmarks that I try on the
>     same machine, as they are:
>     fillseq          :     1.23238 micros/op;   89.8 MB/s; 1000000 ops
>     done;  1 / 1 task(s) finished.
>     readrandom       :     9.25380 micros/op;   12.0 MB/s; 1000000 /
>     1000000 found;  1 / 1 task(s) finished.
>     fillrandom       :     4.46839 micros/op;   24.8 MB/s; 1000000 ops
>     done;  1 / 1 task(s) finished.
>
>     guess I'll have to look at it a bit more...
>
>     thanks anyway,
>     maciek
>
>
>
>     On 21/04/2016 08:41, Maciek Próchniak wrote:
>>     Hi Ufuk,
>>
>>     thanks for quick reply.
>>     Actually I had a little time to try both things.
>>     1) helped only temporarily - it just took a bit longer to
>>     saturate the pool. After few minutes, periodically all kafka
>>     threads were waiting for bufferPool.
>>     2) This seemed to help. I also reduced checkpoint interval - on
>>     rocks we had 5min, now I tried 30s. .
>>
>>     I attach throughput metrics - the former (around 18) is with
>>     increased heap & buffers, the latter (around 22) is with
>>     FileSystemStateBackend.
>>     My state is few GB large - during the test it reached around
>>     2-3GB. I must admit I was quite impressed that checkpointing to
>>     HDFS using FileSystem took only about 6-7s (with occasional
>>     spikes to 12-13s, which can be seen on metrcs - didn't check if
>>     it was caused by hdfs or sth else).
>>
>>     Now I looked at logs from 18 and seems like checkpointing rocksdb
>>     took around 2-3minutes:
>>     2016-04-20 17:47:33,439 [Checkpoint Timer] INFO
>>     org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
>>     Triggering checkpoint 6 @ 1461167253439
>>     2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147]
>>     INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
>>     Completed checkpoint 6 (in 140588 ms)
>>     - however I don't see any threads dumping state in threadStacks...
>>
>>     I guess I'll have to add some metrics around state invocations to
>>     see where is the problem with rocksDB... I'll write if I find
>>     anything, but that won't be today I think...
>>
>>     Btw - I was looking at FS state and I wonder would it be feasible
>>     to make variant of this state using immutable map (probably some
>>     scala one) to be able to do async checkpoints.
>>     Then synchronous part would be essentially free - just taking the
>>     state map and materializing it asynchronously.
>>     Of course, that would work only for immutable state - but this is
>>     often the case when writing in scala. WDYT?
>>
>>     thanks,
>>     maciek
>>
>>
>>
>>
>>     On 20/04/2016 16:28, Ufuk Celebi wrote:
>>>     Could be different things actually, including the parts of the
>>>     network
>>>     you mentioned.
>>>
>>>     1)
>>>
>>>     Regarding the TM config:
>>>     - It can help to increase the number of network buffers (you can go
>>>     ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
>>>     - In general, you have way more memory available than you actually
>>>     give to Flink. I would increase the 20 GB heap size.
>>>
>>>     As a first step you could address these two points and re-run
>>>     your job.
>>>
>>>     2)
>>>
>>>     As a follow-up you could also work with the FileSystemStateBackend,
>>>     which keeps state in memory (on-heap) and writes checkpoints to
>>>     files.
>>>     This would help in checking how much RocksDB is slowing things
>>>     down.
>>>
>>>
>>>     I'm curious about the results. Do you think you will have time
>>>     to try this?
>>>
>>>     – Ufuk
>>>
>>>
>>>     On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <mpr@touk.pl>
>>>     <mailto:mpr@touk.pl> wrote:
>>>>     Hi,
>>>>     I'm running my flink job on one rather large machine (20 cores
>>>>     with
>>>>     hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
>>>>     It does more or less:
>>>>     read csv from kafka -> keyBy one of the fields -> some custom
>>>>     state
>>>>     processing.
>>>>     Kafka topic has 24 partitions, so my parallelism is also 24
>>>>
>>>>     After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB
>>>>     state
>>>>     backend) I reached a point when throughput is ~120-150k/s.
>>>>     One the same kafka and machine I reached > 500k/s with simple
>>>>     filtering job,
>>>>     so I wanted to see what's the bottleneck.
>>>>
>>>>     It turns out that quite often all of kafka threads are stuck
>>>>     waiting for
>>>>     buffer from pool:
>>>>     "Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000
>>>>     nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
>>>>         java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>>>              at java.lang.Object.wait(Native Method)
>>>>              at
>>>>     org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>>>>
>>>>              - locked <0x00000002eade3890> (a java.util.ArrayDeque)
>>>>              at
>>>>     org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>>>>
>>>>              at
>>>>     org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>>>>
>>>>              - locked <0x00000002eb73cbd0> (a
>>>>     org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>>>>
>>>>              at
>>>>     scala.collection.immutable.List.foreach(List.scala:381)
>>>>              at
>>>>     org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>>>
>>>>              - locked <0x00000002eaf3eb50> (a java.lang.Object)
>>>>              at
>>>>     org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>>>
>>>>              - locked <0x00000002eaf3eb50> (a java.lang.Object)
>>>>
>>>>     This seems a bit weird for me, as most of state processing
>>>>     threads are idle:
>>>>
>>>>     "My custom function -> (Sink: Unnamed, Map) (19/24)" #7353
>>>>     daemon prio=5
>>>>     os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition
>>>>     [0x00007f7bee8ed000]
>>>>         java.lang.Thread.State: TIMED_WAITING (parking)
>>>>              at sun.misc.Unsafe.park(Native Method)
>>>>              - parking to wait for <0x00000002eb840c38> (a
>>>>     java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>
>>>>              at
>>>>     java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>>>
>>>>              at
>>>>     java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>>>
>>>>              at
>>>>     java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>>>>
>>>>              at
>>>>     org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>>>
>>>>              at
>>>>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>>>
>>>>              at
>>>>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>              at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>>     I tried with using more network buffers, but I doesn't seem to
>>>>     change
>>>>     anything - and if I understand correctly
>>>>     https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
>>>>
>>>>     I should not need more than 24^2 * 4 of them...
>>>>
>>>>     Does anybody encountered such problem? Or maybe it's just
>>>>     normal for such
>>>>     case...
>>>>
>>>>     thanks,
>>>>     maciek
>>>>
>>
>


Mime
View raw message