From Robert Schmidtke <ro.schmid...@gmail.com>
Subject Disk I/O in Flink
Date Fri, 07 Apr 2017 08:20:14 GMT

I'm currently examining the I/O patterns of Flink, and I'd like to know
when/how Flink goes to disk. Let me give an introduction of what I have
done so far.

I am running TeraGen (from the Hadoop examples package) + TeraSort (
https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each
node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of
disk. I'm using YARN and HDFS. The underlying file system is XFS.

Now before running TeraGen and TeraSort, I reset the XFS counters to zero,
and after TeraGen + TeraSort are finished, I dump the XFS counters again.
Accumulated over the entire cluster I get 3 TiB of writes and 3.2 TiB of
reads. What I'd have expected would be 2 TiB of writes (1 for TeraGen, 1
for TeraSort) and 1 TiB of reads (during TeraSort).

Unsatisfied by the coarseness of these numbers I developed an HDFS wrapper
that logs file system statistics for each call to hdfs://..., such as start
time/end time, no. of bytes read/written etc. I can plot these numbers and
see what I expect: during TeraGen I have 1 TiB of writes to hdfs://...,
during TeraSort I have 1 TiB of reads from and 1 TiB of writes to
hdfs://... So far, so good.

Now this still did not explain the disk I/O, so I added bytecode
instrumentation to a range of Java classes, like FileIn/OutputStream,
RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory
mapped files etc., and have the same statistics: start/end of a read
from/write to disk, no. of bytes involved and such. I can plot these
numbers too and see that the HDFS JVMs write 1 TiB of data to disk during
TeraGen (expected) and read and write 1 TiB from and to disk during
TeraSort (expected).

Sorry for the enormous introduction, but now there's finally the
interesting part: Flink's JVMs read from and write to disk 1 TiB of data
each during TeraSort. I'm suspecting there is some sort of spilling
involved, potentially because I have not done the setup properly. But that
is not the crucial point: my statistics give a total of 3 TiB of writes to
disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters
from above. However, my statistics only give 2 TiB of reads from disk (1
TiB for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from
disk somewhere. I have done the same with Hadoop TeraSort, and there I'm
not missing any data, meaning my statistics agree with XFS for TeraSort on
Hadoop, which is why I suspect there are some cases where Flink goes to
disk without me noticing it.

Therefore here finally the question: in which cases does Flink go to disk,
and how does it do so (meaning precisely which Java classes are involved,
so I can check my bytecode instrumentation)? This would also include any
kind of resource distribution via HDFS/YARN I guess (like JAR files and I
don't know what). Seeing that I'm missing an amount of data equal to the
size of my input set I'd suspect there must be some sort of
shuffling/spilling at play here, but I'm not sure. Maybe there is also some
sort of remote I/O involved via sockets or so that I'm missing.

Any hints as to where Flink might incur disk I/O are greatly appreciated!
I'm also happy with doing the digging myself, once pointed to the proper
packages in the Apache Flink source tree (I have done my fair share of
inspection already, but could not be sure whether or not I have missed
something). Thanks a lot in advance!




