flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [13/15] flink git commit: [FLINK-3132] [docs] Initial docs restructure
Date Fri, 15 Jan 2016 15:50:04 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fault_tolerance.md b/docs/apis/batch/fault_tolerance.md
new file mode 100644
index 0000000..51a6b41
--- /dev/null
+++ b/docs/apis/batch/fault_tolerance.md
@@ -0,0 +1,100 @@
+---
+title: "Fault Tolerance"
+
+# Sub-level navigation
+sub-nav-group: batch
+sub-nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink's fault tolerance mechanism recovers programs in the presence of failures and
+continues to execute them. Such failures include machine hardware failures, network failures,
+transient program failures, etc.
+
+* This will be replaced by the TOC
+{:toc}
+
+Batch Processing Fault Tolerance (DataSet API)
+----------------------------------------------
+
+Fault tolerance for programs in the *DataSet API* works by retrying failed executions.
+The number of time that Flink retries the execution before the job is declared as failed
is configurable
+via the *execution retries* parameter. A value of *0* effectively means that fault tolerance
is deactivated.
+
+To activate the fault tolerance, set the *execution retries* to a value larger than zero.
A common choice is a value
+of three.
+
+This example shows how to configure the execution retries for a Flink DataSet program.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setNumberOfExecutionRetries(3);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setNumberOfExecutionRetries(3)
+{% endhighlight %}
+</div>
+</div>
+
+
+You can also define default values for the number of execution retries and the retry delay
in the `flink-conf.yaml`:
+
+~~~
+execution-retries.default: 3
+~~~
+
+
+Retry Delays
+------------
+
+Execution retries can be configured to be delayed. Delaying the retry means that after a
failed execution, the re-execution does not start
+immediately, but only after a certain delay.
+
+Delaying the retries can be helpful when the program interacts with external systems where
for example connections or pending transactions should reach a timeout before re-execution
is attempted.
+
+You can set the retry delay for each program as follows (the sample shows the DataStream
API - the DataSet API works similarly):
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay
+{% endhighlight %}
+</div>
+</div>
+
+You can also define the default value for the retry delay in the `flink-conf.yaml`:
+
+~~~
+execution-retries.delay: 10 s
+~~~
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/LICENSE.txt
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/LICENSE.txt b/docs/apis/batch/fig/LICENSE.txt
new file mode 100644
index 0000000..35b8673
--- /dev/null
+++ b/docs/apis/batch/fig/LICENSE.txt
@@ -0,0 +1,17 @@
+All image files in the folder and its subfolders are
+licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_delta_iterate_operator.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_delta_iterate_operator.png b/docs/apis/batch/fig/iterations_delta_iterate_operator.png
new file mode 100644
index 0000000..470485a
Binary files /dev/null and b/docs/apis/batch/fig/iterations_delta_iterate_operator.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png
new file mode 100644
index 0000000..15f2b54
Binary files /dev/null and b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_iterate_operator.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_iterate_operator.png b/docs/apis/batch/fig/iterations_iterate_operator.png
new file mode 100644
index 0000000..aaf4158
Binary files /dev/null and b/docs/apis/batch/fig/iterations_iterate_operator.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_iterate_operator_example.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_iterate_operator_example.png b/docs/apis/batch/fig/iterations_iterate_operator_example.png
new file mode 100644
index 0000000..be4841c
Binary files /dev/null and b/docs/apis/batch/fig/iterations_iterate_operator_example.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_supersteps.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_supersteps.png b/docs/apis/batch/fig/iterations_supersteps.png
new file mode 100644
index 0000000..331dbc7
Binary files /dev/null and b/docs/apis/batch/fig/iterations_supersteps.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/plan_visualizer.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/plan_visualizer.png b/docs/apis/batch/fig/plan_visualizer.png
new file mode 100644
index 0000000..85b8c55
Binary files /dev/null and b/docs/apis/batch/fig/plan_visualizer.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/hadoop_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/hadoop_compatibility.md b/docs/apis/batch/hadoop_compatibility.md
new file mode 100644
index 0000000..187aa6b
--- /dev/null
+++ b/docs/apis/batch/hadoop_compatibility.md
@@ -0,0 +1,249 @@
+---
+title: "Hadoop Compatibility"
+is_beta: true
+# Sub-level navigation
+sub-nav-group: batch
+sub-nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows
+reusing code that was implemented for Hadoop MapReduce.
+
+You can:
+
+- use Hadoop's `Writable` [data types](programming_guide.html#data-types) in Flink programs.
+- use any Hadoop `InputFormat` as a [DataSource](programming_guide.html#data-sources).
+- use any Hadoop `OutputFormat` as a [DataSink](programming_guide.html#data-sinks).
+- use a Hadoop `Mapper` as [FlatMapFunction](dataset_transformations.html#flatmap).
+- use a Hadoop `Reducer` as [GroupReduceFunction](dataset_transformations.html#groupreduce-on-grouped-dataset).
+
+This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to
the
+[Connecting to other systems]({{ site.baseurl }}/apis/connectors.html) guide for reading
from Hadoop supported file systems.
+
+* This will be replaced by the TOC
+{:toc}
+
+### Project Configuration
+
+Support for Haddop input/output formats is part of the `flink-java` and
+`flink-scala` Maven modules that are always required when writing Flink jobs.
+The code is located in `org.apache.flink.api.java.hadoop` and
+`org.apache.flink.api.scala.hadoop` in an additional sub-package for the
+`mapred` and `mapreduce` API.
+
+Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility`
+Maven module.
+This code resides in the `org.apache.flink.hadoopcompatibility`
+package.
+
+Add the following dependency to your `pom.xml` if you want to reuse Mappers
+and Reducers.
+
+~~~xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-hadoop-compatibility</artifactId>
+	<version>{{site.version}}</version>
+</dependency>
+~~~
+
+### Using Hadoop Data Types
+
+Flink supports all Hadoop `Writable` and `WritableComparable` data types
+out-of-the-box. You do not need to include the Hadoop Compatibility dependency,
+if you only want to use your Hadoop data types. See the
+[Programming Guide](programming_guide.html#data-types) for more details.
+
+### Using Hadoop InputFormats
+
+Hadoop input formats can be used to create a data source by using
+one of the methods `readHadoopFile` or `createHadoopInput` of the
+`ExecutionEnvironment`. The former is used for input formats derived
+from `FileInputFormat` while the latter has to be used for general purpose
+input formats.
+
+The resulting `DataSet` contains 2-tuples where the first field
+is the key and the second field is the value retrieved from the Hadoop
+InputFormat.
+
+The following example shows how to use Hadoop's `TextInputFormat`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Tuple2<LongWritable, Text>> input =
+    env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath);
+
+// Do something with the data.
+[...]
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val input: DataSet[(LongWritable, Text)] =
+  env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
+
+// Do something with the data.
+[...]
+~~~
+
+</div>
+
+</div>
+
+### Using Hadoop OutputFormats
+
+Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
+that implements `org.apache.hadoop.mapred.OutputFormat` or extends
+`org.apache.hadoop.mapreduce.OutputFormat` is supported.
+The OutputFormat wrapper expects its input data to be a DataSet containing
+2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
+
+The following example shows how to use Hadoop's `TextOutputFormat`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// Obtain the result we want to emit
+DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF =
+  // create the Flink wrapper.
+  new HadoopOutputFormat<Text, IntWritable>(
+    // set the Hadoop OutputFormat and specify the job.
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+hadoopResult.output(hadoopOF);
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+// Obtain your result to emit.
+val hadoopResult: DataSet[(Text, IntWritable)] = [...]
+
+val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
+  new TextOutputFormat[Text, IntWritable],
+  new JobConf)
+
+hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
+FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
+
+hadoopResult.output(hadoopOF)
+
+
+~~~
+
+</div>
+
+</div>
+
+### Using Hadoop Mappers and Reducers
+
+Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap)
and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset).
Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces,
i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment,
only the Mapper and Reduce interfaces of Hadoop's mapred API (`org.apache.hadoop.mapred`)
are supported.
+
+The wrappers take a `DataSet<Tuple2<KEYIN,VALUEIN>>` as input and produce a `DataSet<Tuple2<KEYOUT,VALUEOUT>>`
as output where `KEYIN` and `KEYOUT` are the keys and `VALUEIN` and `VALUEOUT` are the values
of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink
offers a wrapper for a GroupReduceFunction with (`HadoopReduceCombineFunction`) and without
a Combiner (`HadoopReduceFunction`). The wrappers accept an optional `JobConf` object to configure
the Hadoop Mapper or Reducer.
+
+Flink's function wrappers are
+
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction`,
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction`, and
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction`.
+
+and can be used as regular Flink [FlatMapFunctions](dataset_transformations.html#flatmap)
or [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset).
+
+The following example shows how to use Hadoop `Mapper` and `Reducer` functions.
+
+~~~java
+// Obtain data to process somehow.
+DataSet<Tuple2<Text, LongWritable>> text = [...]
+
+DataSet<Tuple2<Text, LongWritable>> result = text
+  // use Hadoop Mapper (Tokenizer) as MapFunction
+  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
+    new Tokenizer()
+  ))
+  .groupBy(0)
+  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
+  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
+    new Counter(), new Counter()
+  ));
+~~~
+
+**Please note:** The Reducer wrapper works on groups as defined by Flink's [groupBy()](dataset_transformations.html#transformations-on-grouped-dataset)
operation. It does not consider any custom partitioners, sort or grouping comparators you
might have set in the `JobConf`.
+
+### Complete Hadoop WordCount Example
+
+The following example shows a complete WordCount implementation using Hadoop data types,
Input- and OutputFormats, and Mapper and Reducer implementations.
+
+~~~java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// Set up the Hadoop TextInputFormat.
+Job job = Job.getInstance();
+HadoopInputFormat<LongWritable, Text> hadoopIF =
+  new HadoopInputFormat<LongWritable, Text>(
+    new TextInputFormat(), LongWritable.class, Text.class, job
+  );
+TextInputFormat.addInputPath(job, new Path(inputPath));
+
+// Read data using the Hadoop TextInputFormat.
+DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
+
+DataSet<Tuple2<Text, LongWritable>> result = text
+  // use Hadoop Mapper (Tokenizer) as MapFunction
+  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
+    new Tokenizer()
+  ))
+  .groupBy(0)
+  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
+  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
+    new Counter(), new Counter()
+  ));
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF =
+  new HadoopOutputFormat<Text, IntWritable>(
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+result.output(hadoopOF);
+
+// Execute Program
+env.execute("Hadoop WordCount");
+~~~


Mime
View raw message