flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-1197] [docs] Add information about types and type extraction
Date Fri, 09 Jan 2015 16:33:04 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7e08fa1f9 -> 6b36fd20f

[FLINK-1197] [docs] Add information about types and type extraction

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b36fd20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b36fd20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b36fd20

Branch: refs/heads/master
Commit: 6b36fd20f9aefaa75475f840b437229e351f401a
Parents: 7e08fa1
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Jan 9 17:20:06 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Jan 9 17:25:12 2015 +0100

 docs/_includes/sidenav.html          |   3 +-
 docs/internal_job_scheduling.md      |   2 +-
 docs/internal_types_serialization.md | 228 ++++++++++++++++++++++++++++++
 3 files changed, 231 insertions(+), 2 deletions(-)

diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html
index 8a6585f..a308c18 100644
--- a/docs/_includes/sidenav.html
+++ b/docs/_includes/sidenav.html
@@ -50,9 +50,10 @@ under the License.
   <li><div class="sidenav-item"><a href="cli.html">Command-Line Interface</a></div></li>
   <li><div class="sidenav-item-bottom"><a href="web_client.html">Web Interface</a></div></li>
-  <li><div class="sidenav-category">Internals</div></li>
+  <li><div class="sidenav-category">Advanced</div></li>
   <li><div class="sidenav-item"><a href="internal_general_arch.html">Architecture
and Process Model</a></div></li>
   <!-- <li><a href="internal_program_life_cycle.html">From Program to Execution</a></li>
+  <li><div class="sidenav-item"><a href="internal_types_serialization.html">Type
extraction and Serialization</a></div></li>
   <li><div class="sidenav-item"><a href="internal_distributed_akka.html">Distributed
Communication via Akka</a></div></li>
   <li><div class="sidenav-item"><a href="internal_job_scheduling.html">Jobs
and Scheduling</a></div></li>
   <!-- <li><a href="#">Types & Serialization</a></li> -->

diff --git a/docs/internal_job_scheduling.md b/docs/internal_job_scheduling.md
index 6cfde4d..0d08f76 100644
--- a/docs/internal_job_scheduling.md
+++ b/docs/internal_job_scheduling.md
@@ -35,7 +35,7 @@ each of which can run one pipeline of parallel tasks. A pipeline consists
of mul
 Note that Flink often executes successive tasks concurrently: For Streaming programs, that
happens in any case,
 but also for batch programs, it happens frequently.
-The figure below illustrates that. Consider a program with a data source, a *MapFunction*,
and a *ReduceFunctoin*.
+The figure below illustrates that. Consider a program with a data source, a *MapFunction*,
and a *ReduceFunction*.
 The source and MapFunction are executed with a parallelism of 4, while the ReduceFunction
is executed with a
 parallism of 3. A pipeline consists of the sequence Source - Map - Reduce. On a cluster with
2 TaskManagers with
 3 slots each, the program will be executed as described below.

diff --git a/docs/internal_types_serialization.md b/docs/internal_types_serialization.md
new file mode 100644
index 0000000..187364b
--- /dev/null
+++ b/docs/internal_types_serialization.md
@@ -0,0 +1,228 @@
+title:  "Type Extraction and Serialization"
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+Flink handles types in a unique way, containing its own type descriptors,
+generic type extraction, and type serialization framework.
+This document describes the concepts and the rationale behind them.
+There are fundamental differences in the way that the Scala API and
+the Java API handle type information, so most of the issues described 
+here relate only to one of the to APIs.
+* This will be replaced by the TOC
+## Type handling in Flink
+Flink tries to know as much information about what types enter and leave user functions as
+This stands in contrast to the approach to just assuming nothing and letting the
+programming language and serialization framework handle all types dynamically.
+* To allow using POJOs and grouping/joining them by referring to field names, Flink needs
the type
+  information to make checks (for typos and type compatibility) before the job is executed.
+* The more we know, the better serialization and data layout schemes the compiler/optimizer
can develop.
+  That is quite important for the memory usage paradigm in Flink (work on serialized data
+  inside/outside the heap and make serialization very cheap).
+* For the upcoming logical programs (see roadmap draft) we need this to know the "schema"
of functions.
+* Finally, it also spares users having to worry about serialization frameworks and having
to register
+  types at those frameworks.
+## Flink's TypeInformation class
+The class {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
"TypeInformation" %}
+is the base class for all type descriptors. It reveals some basic properties of the type
and can generate serializers
+and, in specializations, comparators for the types.
+(*Note that comparators in Flink do much more than defining an order - they are basically
the utility to handle keys*)
+Internally, Flink makes the following distinctions between types:
+* Basic types: All Java primitives and their boxed form, plus `void`, `String`, and `Date`.
+* Primitive arrays and Object arrays
+* Composite types 
+  * Flink Java Tuples (part of the Flink Java API)
+  * Scala *case classes* (including Scala tuples)
+  * POJOs: classes that follow a certain bean-like pattern
+* Scala auxiliary types (Option, Either, Lists, Maps, ...)
+* Generic types: These will not be serialized by Flink itself, but by Kryo.
+POJOs are of particular interest, because they support the creation of complex types and
the use of field
+names in the definition of keys: `dataSet.join(another).where("name").equalTo("personName")`.
+They are also transparent to the runtime and can be handled very efficiently by Flink.
+**Rules for POJO types**
+Flink recognizes a data type as a POJO type (and allows "by-name" field referencing) if the
+conditions are fulfilled:
+* The class is public and standalone (no non-static inner class)
+* The class has a public no-argument constructor
+* All fields in the class (and all superclasses) are either public or
+  or have a public getter and a setter method that follows the Java beans
+  naming conventions for getters and setters.
+## Type Information in the Scala API
+Scala has very elaborate concepts for runtime type information though *type manifests* and
*class tags*. In
+general, types and methods have access to the types of their generic parameters - thus, Scala
programs do
+not suffer from type erasure as Java programs do.
+In addition, Scala allows to run custom code in the Scala Compiler through Scala Macros -
that means that some Flink
+code gets executed whenever you compile a Scala program written against Flink's Scala API.
+We use the Macros to look at the parameter types and return types of all user functions during
compilation - that
+is the point in time when certainly all type information is perfectly available. Within the
macro, we create
+a *TypeInformation* for the function's return types (or parameter types) and make it part
of the operation.
+#### No Implicit Value for Evidence Parameter Error
+In the case where TypeInformation could not be created, programs fail to compile with an
+stating *"could not find implicit value for evidence parameter of type TypeInformation"*.
+A frequent reason if that the code that generates the TypeInformation has not been imported.
+Make sure to import the entire flink.api.scala package.
+{% highlight scala %}
+import org.apache.flink.api.scala._
+{% endhighlight %}
+Another common cause are generic methods, which can be fixed as described in the following
+#### Generic Methods
+Consider the following case below:
+{% highlight scala %}
+def[T] selectFirst(input: DataSet[(T, _)]) : DataSet[T] = {
+  input.map { v => v._1 }
+val data : DataSet[(String, Long) = ...
+val result = selectFirst(data)
+{% endhighlight %}
+For such generic methods, the data types of the function parameters and return type may not
be the same
+for every call and are not known at the site where the method is defined. The code above
will result
+in an error that not enough implicit evidence is available.
+In such cases, the type information has to be generated at the invocation site and passed
to the
+method. Scala offers *implicit parameters* for that. 
+The following code tells Scala to bring a type information for *T* into the function. The
+information will then be generated at the sites where the method is invoked, rather than
where the
+method is defined.
+{% highlight scala %}
+def[T : TypeInformation] selectFirst(input: DataSet[(T, _)]) : DataSet[T] = {
+  input.map { v => v._1 }
+{% endhighlight %}
+## Type Information in the Java API
+Java in general erases generic type information. Only for subclasses of generic classes,
the subclass
+stores the type to which the generic type variables bind.
+Flink uses reflection on the (anonymous) classes that implement the user functions to figure
out the types of
+the generic parameters of the function. This logic also contains some simple type inference
for cases where
+the return types of functions are dependent on input types, such as in the generic utility
method below:
+{% highlight java %}
+public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>> {
+    public Tuple2<T, Long> map(T value) {
+        return new Tuple2<T, Long>(value, 1L);
+    }
+{% endhighlight %}
+Not in all cases can Flink figure out the data types of functions reliably in Java.
+Some issues remain with generic lambdas (we are trying to solve this with the Java community,
+see below) and with generic type variables that we cannot infer.
+#### Type Hints in the Java API
+To help cases where Flink cannot reconstruct the erased generic type information, the Java
+offers so called *type hints* from version 0.9 on. The type hints tell the system the type
+the data set produced by a function. The following gives an example:
+{% highlight java %}
+DataSet<SomeType> result = dataSet
+    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
+        .returns(SomeType.class);
+{% endhighlight %}
+The `returns` statement specifies the produced type, in this case via a class. The hints
+type definition through
+* Classes, for non-parameterized types (no generics)
+* Strings in the form of `returns("Tuple2<Integer, my.SomeType>")`, which are parsed
and converted
+  to a TypeInformation.
+* A TypeInformation directly
+#### Type extraction for Java 8 lambdas
+Type extraction for Java 8 lambdas works differently than for non-lambdas, because lambdas
are not associated
+with an implementing class that extends the function interface.
+Currently, Flink tries to figure out which method implements the lambda and uses Java's generic
signatures to
+determine the parameter types and the return type. However, these signatures are not generated
for lambdas
+by all compilers (as of writing this document only reliably by the Eclipse JDT compiler 4.5
from Milestone 2
+**Improving Type information for Java Lambdas**
+One of the Flink committers (Timo Walther) has actually become active in the Eclipse JDT
compiler community and
+in the OpenJDK community and submitted patches to the compiler to improve availability of
type information 
+available for Java 8 lambdas.
+The Eclipse JDT compiler has added support for this as of version 4.5 M4. Discussion about
the feature in the
+OpenJDK compiler is pending.

View raw message