flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject svn commit: r1666453 - /flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md
Date Fri, 13 Mar 2015 14:03:15 GMT
Author: fhueske
Date: Fri Mar 13 14:03:15 2015
New Revision: 1666453

URL: http://svn.apache.org/r1666453
Log:
Fixed typos in join blog post

Modified:
    flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md

Modified: flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md
URL: http://svn.apache.org/viewvc/flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md?rev=1666453&r1=1666452&r2=1666453&view=diff
==============================================================================
--- flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md (original)
+++ flink/_posts/2015-03-13-peeking-into-Apache-Flinks-Engine-Room.md Fri Mar 13 14:03:15
2015
@@ -59,7 +59,7 @@ Flink uses techniques which are well kno
 
 In a distributed system joins are commonly processed in two steps:
 
-1. The data of both input is distributed across all parallel instances that participate in
the join and
+1. The data of both inputs is distributed across all parallel instances that participate
in the join and
 1. each parallel instance performs a standard stand-alone join algorithm on its local partition
of the overall data. 
 
 The distribution of data across parallel instances must ensure that each valid join pair
can be locally built by exactly one instance. For both steps, there are multiple valid strategies
that can be independently picked and which are favorable in different situations. In Flink
terminology, the first phase is called Ship Strategy and the second phase Local Strategy.
In the following I will describe Flink’s ship and local strategies to join two datasets
*R* and *S*.
@@ -82,10 +82,10 @@ The Broadcast-Forward strategy sends one
 <img src="{{ site.baseurl }}/img/blog/joins-repartition.png" style="width:90%;margin:15px">
 </center>
 
-The Repartition-Repartition and Broadcast-Forward ship strategies establish suitable data
distributions to execute a distributed join. Depending on the operations that are applied
before the join, one or even both inputs of a join are already distributed in suitable way
across parallel instance. In this case, Flink will reuse such distributions and only ship
one or no input at all.
+The Repartition-Repartition and Broadcast-Forward ship strategies establish suitable data
distributions to execute a distributed join. Depending on the operations that are applied
before the join, one or even both inputs of a join are already distributed in a suitable way
across parallel instances. In this case, Flink will reuse such distributions and only ship
one or no input at all.
 
 ####Flink’s Memory Management
-Before delving into the details of Flink’s local join algorithms, I will briefly discuss
Flink’s internal memory management. Data processing algorithms such as joining, grouping,
and sorting need to hold portions of their input data in memory. While such algorithms perform
best if there is enough memory available to hold all data, it is crucial to gracefully handle
situations where the data size exceeds memory. Such situations are especially tricky in JVM-based
systems such as Flink because the system needs to reliably recognize that it is short on memory.
Failure to detect such situations can result in a `OutOfMemoryException` and kill the JVM.

+Before delving into the details of Flink’s local join algorithms, I will briefly discuss
Flink’s internal memory management. Data processing algorithms such as joining, grouping,
and sorting need to hold portions of their input data in memory. While such algorithms perform
best if there is enough memory available to hold all data, it is crucial to gracefully handle
situations where the data size exceeds memory. Such situations are especially tricky in JVM-based
systems such as Flink because the system needs to reliably recognize that it is short on memory.
Failure to detect such situations can result in an `OutOfMemoryException` and kill the JVM.

 
 Flink handles this challenge by actively managing its memory. When a worker node (TaskManager)
is started, it allocates a fixed portion (70% by default) of the JVM’s heap memory that
is available after initialization as 32KB byte arrays. These byte arrays are distributed as
working memory to all algorithms that need to hold significant portions of data in memory.
The algorithms receive their input data as Java data objects and serialize them into their
working memory.
 
@@ -110,7 +110,7 @@ The Sort-Merge-Join works by first sorti
 <img src="{{ site.baseurl }}/img/blog/joins-smj.png" style="width:90%;margin:15px">
 </center>
 
-The Hybrid-Hash-Join distinguishes its inputs as build-side and probe-side input and works
in two phases, a build phase followed by a probe phase. In the build phase, the algorithm
reads the build-side input and inserts all data elements into an in-memory hash table indexed
by their join key attributes. If the hash table outgrows the algorithm's working memory, parts
of the hash table (ranges of hash indexes) are written to the local filesystem. The build
phase ends after the build-side input has been fully consumed. In the probe phase, the algorithm
reads the probe-side input and probes the hash table for each element using its join key attribute.
If the element falls into a hash index range that was spilled to disk, the element is also
written to disk. Otherwise, the element is immediately joined with all matching elements from
the hash table. If the hash table completely fit into the working memory, the join is finished
after the probe-side input has been fully consumed. Otherwis
 e, the current hash table is dropped and a new hash table is built using spilled parts of
the build-side input. This hash table is probed by the corresponding parts of the spilled
probe-side input. Eventually, all data is joined. Hybrid-Hash-Joins perform best if the hash
table completely fits into the working memory because an arbitrarily large the probe-side
input can be processed on-the-fly without materializing it. However even if build-side input
does not fit into memory, the the Hybrid-Hash-Join has very nice properties. In this case,
in-memory processing is partially preserved and only a fraction of the build-side and probe-side
data needs to be written to and read from the local filesystem. The next figure illustrates
how the Hybrid-Hash-Join works.
+The Hybrid-Hash-Join distinguishes its inputs as build-side and probe-side input and works
in two phases, a build phase followed by a probe phase. In the build phase, the algorithm
reads the build-side input and inserts all data elements into an in-memory hash table indexed
by their join key attributes. If the hash table outgrows the algorithm's working memory, parts
of the hash table (ranges of hash indexes) are written to the local filesystem. The build
phase ends after the build-side input has been fully consumed. In the probe phase, the algorithm
reads the probe-side input and probes the hash table for each element using its join key attribute.
If the element falls into a hash index range that was spilled to disk, the element is also
written to disk. Otherwise, the element is immediately joined with all matching elements from
the hash table. If the hash table completely fits into the working memory, the join is finished
after the probe-side input has been fully consumed. Otherwi
 se, the current hash table is dropped and a new hash table is built using spilled parts of
the build-side input. This hash table is probed by the corresponding parts of the spilled
probe-side input. Eventually, all data is joined. Hybrid-Hash-Joins perform best if the hash
table completely fits into the working memory because an arbitrarily large the probe-side
input can be processed on-the-fly without materializing it. However even if build-side input
does not fit into memory, the the Hybrid-Hash-Join has very nice properties. In this case,
in-memory processing is partially preserved and only a fraction of the build-side and probe-side
data needs to be written to and read from the local filesystem. The next figure illustrates
how the Hybrid-Hash-Join works.
 
 <center>
 <img src="{{ site.baseurl }}/img/blog/joins-hhj.png" style="width:90%;margin:15px">
@@ -130,7 +130,7 @@ Alright, that sounds good, but how fast
 <img src="{{ site.baseurl }}/img/blog/joins-single-perf.png" style="width:85%;margin:15px">
 </center>
 
-The joins with 1 to 3 GB build side (blue bars) are pure in-memory joins. The other joins
partially spill data to disk (4 to 12GB, orange bars). The results show that the performance
of Flink’s Hybrid-Hash-Join remains stable as long as the hash table completely fits
into memory. As soon as the hash table becomes larger than the working memory, parts of the
hash table and corresponding parts of the probe side are spilled to disk. The chart shows
that the performance of the Hybrid-Hash-Join gracefully decreases in this situation, i.e.,
there is not sharp increase in runtime when the join starts spilling. In combination with
Flink’s robust memory management, this execution behavior gives smooth performance without
the need for fine-grained, data-dependent memory tuning.
+The joins with 1 to 3 GB build side (blue bars) are pure in-memory joins. The other joins
partially spill data to disk (4 to 12GB, orange bars). The results show that the performance
of Flink’s Hybrid-Hash-Join remains stable as long as the hash table completely fits
into memory. As soon as the hash table becomes larger than the working memory, parts of the
hash table and corresponding parts of the probe side are spilled to disk. The chart shows
that the performance of the Hybrid-Hash-Join gracefully decreases in this situation, i.e.,
there is no sharp increase in runtime when the join starts spilling. In combination with Flink’s
robust memory management, this execution behavior gives smooth performance without the need
for fine-grained, data-dependent memory tuning.
 
 So, Flink’s Hybrid-Hash-Join implementation performs well on a single thread even for
limited memory resources, but how good is Flink’s performance when joining larger datasets
in a distributed setting? For the next experiment we compare the performance of the most common
join strategy combinations, namely:
 



Mime
View raw message