flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] flink-web git commit: Add FAQ entry for hdfs errors caused by wrong hadoop version
Date Tue, 30 Jun 2015 15:29:59 GMT
Repository: flink-web
Updated Branches:
  refs/heads/asf-site 32409da22 -> 630f25830


http://git-wip-us.apache.org/repos/asf/flink-web/blob/630f2583/content/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
----------------------------------------------------------------------
diff --git a/content/news/2015/05/11/Juggling-with-Bits-and-Bytes.html b/content/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
index 60ae96e..579e94e 100644
--- a/content/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
+++ b/content/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
@@ -166,7 +166,7 @@ However, this approach has a few notable drawbacks. First of all it is
not trivi
 <img src="/img/blog/memory-mgmt.png" style="width:90%;margin:15px" />
 </center>
 
-<p>Flink’s style of active memory management and operating on binary data has several
benefits:</p>
+<p>Flink’s style of active memory management and operating on binary data has several
benefits: </p>
 
 <ol>
   <li><strong>Memory-safe execution &amp; efficient out-of-core algorithms.</strong>
Due to the fixed amount of allocated memory segments, it is trivial to monitor remaining memory
resources. In case of memory shortage, processing operators can efficiently write larger batches
of memory segments to disk and later them read back. Consequently, <code>OutOfMemoryErrors</code>
are effectively prevented.</li>
@@ -175,13 +175,13 @@ However, this approach has a few notable drawbacks. First of all it
is not trivi
   <li><strong>Efficient binary operations &amp; cache sensitivity.</strong>
Binary data can be efficiently compared and operated on given a suitable binary representation.
Furthermore, the binary representations can put related values, as well as hash codes, keys,
and pointers, adjacently into memory. This gives data structures with usually more cache efficient
access patterns.</li>
 </ol>
 
-<p>These properties of active memory management are very desirable in a data processing
systems for large-scale data analytics but have a significant price tag attached. Active memory
management and operating on binary data is not trivial to implement, i.e., using <code>java.util.HashMap</code>
is much easier than implementing a spillable hash-table backed by byte arrays and a custom
serialization stack. Of course Apache Flink is not the only JVM-based data processing system
that operates on serialized binary data. Projects such as <a href="http://drill.apache.org/">Apache
Drill</a>, <a href="http://ignite.incubator.apache.org/">Apache Ignite (incubating)</a>
or <a href="http://projectgeode.org/">Apache Geode (incubating)</a> apply similar
techniques and it was recently announced that also <a href="http://spark.apache.org/">Apache
Spark</a> will evolve into this direction with <a href="https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html">
 Project Tungsten</a>.</p>
+<p>These properties of active memory management are very desirable in a data processing
systems for large-scale data analytics but have a significant price tag attached. Active memory
management and operating on binary data is not trivial to implement, i.e., using <code>java.util.HashMap</code>
is much easier than implementing a spillable hash-table backed by byte arrays and a custom
serialization stack. Of course Apache Flink is not the only JVM-based data processing system
that operates on serialized binary data. Projects such as <a href="http://drill.apache.org/">Apache
Drill</a>, <a href="http://ignite.incubator.apache.org/">Apache Ignite (incubating)</a>
or <a href="http://projectgeode.org/">Apache Geode (incubating)</a> apply similar
techniques and it was recently announced that also <a href="http://spark.apache.org/">Apache
Spark</a> will evolve into this direction with <a href="https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html">
 Project Tungsten</a>. </p>
 
 <p>In the following we discuss in detail how Flink allocates memory, de/serializes
objects, and operates on binary data. We will also show some performance numbers comparing
processing objects on the heap and operating on binary data.</p>
 
 <h2 id="how-does-flink-allocate-memory">How does Flink allocate memory?</h2>
 
-<p>A Flink worker, called TaskManager, is composed of several internal components such
as an actor system for coordination with the Flink master, an IOManager that takes care of
spilling data to disk and reading it back, and a MemoryManager that coordinates memory usage.
In the context of this blog post, the MemoryManager is of most interest.</p>
+<p>A Flink worker, called TaskManager, is composed of several internal components such
as an actor system for coordination with the Flink master, an IOManager that takes care of
spilling data to disk and reading it back, and a MemoryManager that coordinates memory usage.
In the context of this blog post, the MemoryManager is of most interest. </p>
 
 <p>The MemoryManager takes care of allocating, accounting, and distributing MemorySegments
to data processing operators such as sort and join operators. A <a href="https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java">MemorySegment</a>
is Flink’s distribution unit of memory and is backed by a regular Java byte array (size
is 32 KB by default). A MemorySegment provides very efficient write and read access to its
backed byte array using Java’s unsafe methods. You can think of a MemorySegment as a custom-tailored
version of Java’s NIO ByteBuffer. In order to operate on multiple MemorySegments like on
a larger chunk of consecutive memory, Flink uses logical views that implement Java’s <code>java.io.DataOutput</code>
and <code>java.io.DataInput</code> interfaces.</p>
 
@@ -193,7 +193,7 @@ However, this approach has a few notable drawbacks. First of all it is
not trivi
 
 <h2 id="how-does-flink-serialize-objects">How does Flink serialize objects?</h2>
 
-<p>The Java ecosystem offers several libraries to convert objects into a binary representation
and back. Common alternatives are standard Java serialization, <a href="https://github.com/EsotericSoftware/kryo">Kryo</a>,
<a href="http://avro.apache.org/">Apache Avro</a>, <a href="http://thrift.apache.org/">Apache
Thrift</a>, or Google’s <a href="https://github.com/google/protobuf">Protobuf</a>.
Flink includes its own custom serialization framework in order to control the binary representation
of data. This is important because operating on binary data such as comparing or even manipulating
binary data requires exact knowledge of the serialization layout. Further, configuring the
serialization layout with respect to operations that are performed on binary data can yield
a significant performance boost. Flink’s serialization stack also leverages the fact, that
the type of the objects which are going through de/serialization are exactly known before
a program is executed.</p>
+<p>The Java ecosystem offers several libraries to convert objects into a binary representation
and back. Common alternatives are standard Java serialization, <a href="https://github.com/EsotericSoftware/kryo">Kryo</a>,
<a href="http://avro.apache.org/">Apache Avro</a>, <a href="http://thrift.apache.org/">Apache
Thrift</a>, or Google’s <a href="https://github.com/google/protobuf">Protobuf</a>.
Flink includes its own custom serialization framework in order to control the binary representation
of data. This is important because operating on binary data such as comparing or even manipulating
binary data requires exact knowledge of the serialization layout. Further, configuring the
serialization layout with respect to operations that are performed on binary data can yield
a significant performance boost. Flink’s serialization stack also leverages the fact, that
the type of the objects which are going through de/serialization are exactly known before
a program is executed. </p>
 
 <p>Flink programs can process data represented as arbitrary Java or Scala objects.
Before a program is optimized, the data types at each processing step of the program’s data
flow need to be identified. For Java programs, Flink features a reflection-based type extraction
component to analyze the return types of user-defined functions. Scala programs are analyzed
with help of the Scala compiler. Flink represents each data type with a <a href="https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java">TypeInformation</a>.
Flink has TypeInformations for several kinds of data types, including:</p>
 
@@ -203,11 +203,11 @@ However, this approach has a few notable drawbacks. First of all it
is not trivi
   <li>WritableTypeInfo: Any implementation of Hadoop’s Writable interface.</li>
   <li>TupleTypeInfo: Any Flink tuple (Tuple1 to Tuple25). Flink tuples are Java representations
for fixed-length tuples with typed fields.</li>
   <li>CaseClassTypeInfo: Any Scala CaseClass (including Scala tuples).</li>
-  <li>PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all fields either
being public or accessible through getters and setter that follow the common naming conventions.</li>
+  <li>PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all fields either
being public or accessible through getters and setter that follow the common naming conventions.
</li>
   <li>GenericTypeInfo: Any data type that cannot be identified as another type.</li>
 </ul>
 
-<p>Each TypeInformation provides a serializer for the data type it represents. For
example, a BasicTypeInfo returns a serializer that writes the respective primitive type, the
serializer of a WritableTypeInfo delegates de/serialization to the write() and readFields()
methods of the object implementing Hadoop’s Writable interface, and a GenericTypeInfo returns
a serializer that delegates serialization to Kryo. Object serialization to a DataOutput which
is backed by Flink MemorySegments goes automatically through Java’s efficient unsafe operations.
For data types that can be used as keys, i.e., compared and hashed, the TypeInformation provides
TypeComparators. TypeComparators compare and hash objects and can - depending on the concrete
data type - also efficiently compare binary representations and extract fixed-length binary
key prefixes.</p>
+<p>Each TypeInformation provides a serializer for the data type it represents. For
example, a BasicTypeInfo returns a serializer that writes the respective primitive type, the
serializer of a WritableTypeInfo delegates de/serialization to the write() and readFields()
methods of the object implementing Hadoop’s Writable interface, and a GenericTypeInfo returns
a serializer that delegates serialization to Kryo. Object serialization to a DataOutput which
is backed by Flink MemorySegments goes automatically through Java’s efficient unsafe operations.
For data types that can be used as keys, i.e., compared and hashed, the TypeInformation provides
TypeComparators. TypeComparators compare and hash objects and can - depending on the concrete
data type - also efficiently compare binary representations and extract fixed-length binary
key prefixes. </p>
 
 <p>Tuple, Pojo, and CaseClass types are composite types, i.e., containers for one or
more possibly nested data types. As such, their serializers and comparators are also composite
and delegate the serialization and comparison of their member data types to the respective
serializers and comparators. The following figure illustrates the serialization of a (nested)
<code>Tuple3&lt;Integer, Double, Person&gt;</code> object where <code>Person</code>
is a POJO and defined as follows:</p>
 
@@ -220,13 +220,13 @@ However, this approach has a few notable drawbacks. First of all it
is not trivi
 <img src="/img/blog/data-serialization.png" style="width:80%;margin:15px" />
 </center>
 
-<p>Flink’s type system can be easily extended by providing custom TypeInformations,
Serializers, and Comparators to improve the performance of serializing and comparing custom
data types.</p>
+<p>Flink’s type system can be easily extended by providing custom TypeInformations,
Serializers, and Comparators to improve the performance of serializing and comparing custom
data types. </p>
 
 <h2 id="how-does-flink-operate-on-binary-data">How does Flink operate on binary data?</h2>
 
 <p>Similar to many other data processing APIs (including SQL), Flink’s APIs provide
transformations to group, sort, and join data sets. These transformations operate on potentially
very large data sets. Relational database systems feature very efficient algorithms for these
purposes since several decades including external merge-sort, merge-join, and hybrid hash-join.
Flink builds on this technology, but generalizes it to handle arbitrary objects using its
custom serialization and comparison stack. In the following, we show how Flink operates with
binary data by the example of Flink’s in-memory sort algorithm.</p>
 
-<p>Flink assigns a memory budget to its data processing operators. Upon initialization,
a sort algorithm requests its memory budget from the MemoryManager and receives a corresponding
set of MemorySegments. The set of MemorySegments becomes the memory pool of a so-called sort
buffer which collects the data that is be sorted. The following figure illustrates how data
objects are serialized into the sort buffer.</p>
+<p>Flink assigns a memory budget to its data processing operators. Upon initialization,
a sort algorithm requests its memory budget from the MemoryManager and receives a corresponding
set of MemorySegments. The set of MemorySegments becomes the memory pool of a so-called sort
buffer which collects the data that is be sorted. The following figure illustrates how data
objects are serialized into the sort buffer. </p>
 
 <center>
 <img src="/img/blog/sorting-binary-data-1.png" style="width:90%;margin:15px" />
@@ -239,7 +239,7 @@ The following figure shows how two objects are compared.</p>
 <img src="/img/blog/sorting-binary-data-2.png" style="width:80%;margin:15px" />
 </center>
 
-<p>The sort buffer compares two elements by comparing their binary fix-length sort
keys. The comparison is successful if either done on a full key (not a prefix key) or if the
binary prefix keys are not equal. If the prefix keys are equal (or the sort key data type
does not provide a binary prefix key), the sort buffer follows the pointers to the actual
object data, deserializes both objects and compares the objects. Depending on the result of
the comparison, the sort algorithm decides whether to swap the compared elements or not. The
sort buffer swaps two elements by moving their fix-length keys and pointers. The actual data
is not moved. Once the sort algorithm finishes, the pointers in the sort buffer are correctly
ordered. The following figure shows how the sorted data is returned from the sort buffer.</p>
+<p>The sort buffer compares two elements by comparing their binary fix-length sort
keys. The comparison is successful if either done on a full key (not a prefix key) or if the
binary prefix keys are not equal. If the prefix keys are equal (or the sort key data type
does not provide a binary prefix key), the sort buffer follows the pointers to the actual
object data, deserializes both objects and compares the objects. Depending on the result of
the comparison, the sort algorithm decides whether to swap the compared elements or not. The
sort buffer swaps two elements by moving their fix-length keys and pointers. The actual data
is not moved. Once the sort algorithm finishes, the pointers in the sort buffer are correctly
ordered. The following figure shows how the sorted data is returned from the sort buffer.
</p>
 
 <center>
 <img src="/img/blog/sorting-binary-data-3.png" style="width:80%;margin:15px" />
@@ -257,7 +257,7 @@ The following figure shows how two objects are compared.</p>
   <li><strong>Kryo-serialized.</strong> The tuple fields are serialized
into a sort buffer of 600 MB size using Kryo serialization and sorted without binary sort
keys. This means that each pair-wise comparison requires two object to be deserialized.</li>
 </ol>
 
-<p>All sort methods are implemented using a single thread. The reported times are averaged
over ten runs. After each run, we call <code>System.gc()</code> to request a garbage
collection run which does not go into measured execution time. The following figure shows
the time to store the input data in memory, sort it, and read it back as objects.</p>
+<p>All sort methods are implemented using a single thread. The reported times are averaged
over ten runs. After each run, we call <code>System.gc()</code> to request a garbage
collection run which does not go into measured execution time. The following figure shows
the time to store the input data in memory, sort it, and read it back as objects. </p>
 
 <center>
 <img src="/img/blog/sort-benchmark.png" style="width:90%;margin:15px" />
@@ -315,13 +315,13 @@ The following figure shows how two objects are compared.</p>
 
 <p><br /></p>
 
-<p>To summarize, the experiments verify the previously stated benefits of operating
on binary data.</p>
+<p>To summarize, the experiments verify the previously stated benefits of operating
on binary data. </p>
 
 <h2 id="were-not-done-yet">We’re not done yet!</h2>
 
-<p>Apache Flink features quite a bit of advanced techniques to safely and efficiently
process huge amounts of data with limited memory resources. However, there are a few points
that could make Flink even more efficient. The Flink community is working on moving the managed
memory to off-heap memory. This will allow for smaller JVMs, lower garbage collection overhead,
and also easier system configuration. With Flink’s Table API, the semantics of all operations
such as aggregations and projections are known (in contrast to black-box user-defined functions).
Hence we can generate code for Table API operations that directly operates on binary data.
Further improvements include serialization layouts which are tailored towards the operations
that are applied on the binary data and code generation for serializers and comparators.</p>
+<p>Apache Flink features quite a bit of advanced techniques to safely and efficiently
process huge amounts of data with limited memory resources. However, there are a few points
that could make Flink even more efficient. The Flink community is working on moving the managed
memory to off-heap memory. This will allow for smaller JVMs, lower garbage collection overhead,
and also easier system configuration. With Flink’s Table API, the semantics of all operations
such as aggregations and projections are known (in contrast to black-box user-defined functions).
Hence we can generate code for Table API operations that directly operates on binary data.
Further improvements include serialization layouts which are tailored towards the operations
that are applied on the binary data and code generation for serializers and comparators. </p>
 
-<p>The groundwork (and a lot more) for operating on binary data is done but there is
still some room for making Flink even better and faster. If you are crazy about performance
and like to juggle with lot of bits and bytes, join the Flink community!</p>
+<p>The groundwork (and a lot more) for operating on binary data is done but there is
still some room for making Flink even better and faster. If you are crazy about performance
and like to juggle with lot of bits and bytes, join the Flink community! </p>
 
 <h2 id="tldr-give-me-three-things-to-remember">TL;DR; Give me three things to remember!</h2>
 

http://git-wip-us.apache.org/repos/asf/flink-web/blob/630f2583/content/news/2015/05/14/Community-update-April.html
----------------------------------------------------------------------
diff --git a/content/news/2015/05/14/Community-update-April.html b/content/news/2015/05/14/Community-update-April.html
index 64b681f..fe9485a 100644
--- a/content/news/2015/05/14/Community-update-April.html
+++ b/content/news/2015/05/14/Community-update-April.html
@@ -147,7 +147,7 @@
       <article>
         <p>14 May 2015 by Kostas Tzoumas (<a href="https://twitter.com/kostas_tzoumas">@kostas_tzoumas</a>)</p>
 
-<p>April was an packed month for Apache Flink.</p>
+<p>April was an packed month for Apache Flink. </p>
 
 <h2 id="flink-090-milestone1-release">Flink 0.9.0-milestone1 release</h2>
 
@@ -163,7 +163,7 @@
 
 <h2 id="flink-on-the-web">Flink on the web</h2>
 
-<p>Fabian Hueske gave an <a href="http://www.infoq.com/news/2015/04/hueske-apache-flink?utm_campaign=infoq_content&amp;utm_source=infoq&amp;utm_medium=feed&amp;utm_term=global">interview
at InfoQ</a> on Apache Flink.</p>
+<p>Fabian Hueske gave an <a href="http://www.infoq.com/news/2015/04/hueske-apache-flink?utm_campaign=infoq_content&amp;utm_source=infoq&amp;utm_medium=feed&amp;utm_term=global">interview
at InfoQ</a> on Apache Flink. </p>
 
 <h2 id="upcoming-events">Upcoming events</h2>
 

http://git-wip-us.apache.org/repos/asf/flink-web/blob/630f2583/faq.md
----------------------------------------------------------------------
diff --git a/faq.md b/faq.md
index ec45656..a8c0c8e 100644
--- a/faq.md
+++ b/faq.md
@@ -159,6 +159,29 @@ Please refer to the [download page]({{ site.baseurl }}/downloads.html#maven)
and
 the {% github README.md master "build instructions" %}
 for details on how to set up Flink for different Hadoop and HDFS versions.
 
+
+### My job fails with various exceptions from the HDFS/Hadoop code. What can I do?
+
+Flink is shipping with the Hadoop 2.2 binaries by default. These binaries are used
+to connect to HDFS or YARN.
+It seems that there are some bugs in the HDFS client which cause exceptions while writing
to HDFS
+(in particular under high load).
+Among the exceptions are the following:
+
+- `HDFS client trying to connect to the standby Namenode "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby"`
+- `java.io.IOException: Bad response ERROR for block BP-1335380477-172.22.5.37-1424696786673:blk_1107843111_34301064
from datanode 172.22.5.81:50010
+  at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:732)`
+
+- `Caused by: org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
0
+        at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:478)
+        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:6039)
+        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:6002)`
+        
+If you are experiencing any of these, we recommend using a Flink build with a Hadoop version
matching
+your local HDFS version.
+You can also manually build Flink against the exact Hadoop version (for example
+when using a Hadoop distribution with a custom patch level)
+
 ### In Eclipse, I get compilation errors in the Scala projects
 
 Flink uses a new feature of the Scala compiler (called "quasiquotes") that have not yet been
properly


Mime
View raw message