From Ufuk Celebi <...@apache.org>
Subject Re: Disk I/O in Flink
Date Mon, 24 Apr 2017 12:25:04 GMT
Hey Robert,

for batch that should cover the relevant spilling code. If the records
are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
incoming records as well. But that should be covered by the
FileChannel instrumentation as well?

– Ufuk

On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
<ro.schmidtke@gmail.com> wrote:
> Hi,
> I have already looked at the UnilateralSortMerger, concluding that all I/O
> eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
> turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel. Are
> there more interaction points between Flink and the underlying file system
> that I might want to consider?
> Thanks!
> Robert
> On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <ykt836@gmail.com> wrote:
>> Hi,
>> You probably want check out UnilateralSortMerger.java, this is the class
>> which is responsible for external sort for flink. Here is a short
>> description for how it works: there are totally 3 threads working together,
>> one for reading, one for sorting partial data in memory, and the last one is
>> responsible for spilling. Flink will first figure out how many memory it can
>> use during the in-memory sort, and manage them as MemorySegments. Once these
>> memory runs out, the sorting thread will take over these memory and do the
>> in-memory sorting (For more details about in-memory sorting, you can see
>> NormalizedKeySorter). After this, the spilling thread will write this sorted
>> data to disk and make these memory available again for reading. This will
>> repeated until all data has been processed.
>> Normally, the data will be read twice (one from source, and one from disk)
>> and write once, but if you spilled too much files, flink will first merge
>> some all the files and make sure the last merge step will not exceed some
>> limit (default 128). Hope this can help you.
>> Best,
>> Kurt
>> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <ro.schmidtke@gmail.com>
>> wrote:
>>> Hi,
>>> I'm currently examining the I/O patterns of Flink, and I'd like to know
>>> when/how Flink goes to disk. Let me give an introduction of what I have done
>>> so far.
>>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>>> (https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each
>>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of disk.
>>> I'm using YARN and HDFS. The underlying file system is XFS.
>>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>>> zero, and after TeraGen + TeraSort are finished, I dump the XFS counters
>>> again. Accumulated over the entire cluster I get 3 TiB of writes and 3.2 TiB
>>> of reads. What I'd have expected would be 2 TiB of writes (1 for TeraGen, 1
>>> for TeraSort) and 1 TiB of reads (during TeraSort).
>>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>>> wrapper that logs file system statistics for each call to hdfs://..., such
>>> as start time/end time, no. of bytes read/written etc. I can plot these
>>> numbers and see what I expect: during TeraGen I have 1 TiB of writes to
>>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of writes
>>> to hdfs://... So far, so good.
>>> Now this still did not explain the disk I/O, so I added bytecode
>>> instrumentation to a range of Java classes, like FileIn/OutputStream,
>>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory
>>> mapped files etc., and have the same statistics: start/end of a read
>>> from/write to disk, no. of bytes involved and such. I can plot these numbers
>>> too and see that the HDFS JVMs write 1 TiB of data to disk during TeraGen
>>> (expected) and read and write 1 TiB from and to disk during TeraSort
>>> (expected).
>>> Sorry for the enormous introduction, but now there's finally the
>>> interesting part: Flink's JVMs read from and write to disk 1 TiB of data
>>> each during TeraSort. I'm suspecting there is some sort of spilling
>>> involved, potentially because I have not done the setup properly. But that
>>> is not the crucial point: my statistics give a total of 3 TiB of writes to
>>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters
>>> from above. However, my statistics only give 2 TiB of reads from disk (1 TiB
>>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from disk
>>> somewhere. I have done the same with Hadoop TeraSort, and there I'm not
>>> missing any data, meaning my statistics agree with XFS for TeraSort on
>>> Hadoop, which is why I suspect there are some cases where Flink goes to disk
>>> without me noticing it.
>>> Therefore here finally the question: in which cases does Flink go to
>>> disk, and how does it do so (meaning precisely which Java classes are
>>> involved, so I can check my bytecode instrumentation)? This would also
>>> include any kind of resource distribution via HDFS/YARN I guess (like JAR
>>> files and I don't know what). Seeing that I'm missing an amount of data
>>> equal to the size of my input set I'd suspect there must be some sort of
>>> shuffling/spilling at play here, but I'm not sure. Maybe there is also some
>>> sort of remote I/O involved via sockets or so that I'm missing.
>>> Any hints as to where Flink might incur disk I/O are greatly appreciated!
>>> I'm also happy with doing the digging myself, once pointed to the proper
>>> packages in the Apache Flink source tree (I have done my fair share of
>>> inspection already, but could not be sure whether or not I have missed
>>> something). Thanks a lot in advance!
>>> Robert
>>> --
>>> My GPG Key ID: 336E2680
> --
> My GPG Key ID: 336E2680

