flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/3] flink git commit: [FLINK-3641] Add documentation for DataSet distributed cache.
Date Sat, 18 Jun 2016 21:40:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master bce355093 -> ba62df14a


[FLINK-3641] Add documentation for DataSet distributed cache.

This closes #2122


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba62df14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba62df14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba62df14

Branch: refs/heads/master
Commit: ba62df14a52660cec85783b1070821acd144fd06
Parents: 5a0c268
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Jun 15 14:19:43 2016 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Sat Jun 18 23:40:23 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/index.md | 106 +++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 105 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba62df14/docs/apis/batch/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/index.md b/docs/apis/batch/index.md
index 993fb72..42c6c92 100644
--- a/docs/apis/batch/index.md
+++ b/docs/apis/batch/index.md
@@ -2019,6 +2019,110 @@ of a function, or use the `withParameters(...)` method to pass in
a configuratio
 
 {% top %}
 
+Distributed Cache
+-------------------
+
+Flink offers a distributed cache, similar to Apache Hadoop, to make files locally accessible
to parallel instances of user functions. This functionality can be used to share files that
contain static external data such as dictionaries or machine-learned regression models.
+
+The cache works as follows. A program registers a file or directory of a [local or remote
filesystem such as HDFS or S3]({{ site.baseurl }}/apis/batch/connectors.html#reading-from-file-systems)
under a specific name in its `ExecutionEnvironment` as a cached file. When the program is
executed, Flink automatically copies the file or directory to the local filesystem of all
workers. A user function can look up the file or directory under the specified name and access
it from the worker's local filesystem. 
+
+The distributed cache is used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+Register the file or directory in the `ExecutionEnvironment`.
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// register a file from HDFS
+env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
+
+// register a local executable file (script, executable, ...)
+env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
+
+// define your program and execute
+...
+DataSet<String> input = ...
+DataSet<Integer> result = input.map(new MyMapper());
+...
+env.execute();
+{% endhighlight %}
+
+Access the cached file or directory in a user function (here a `MapFunction`). The function
must extend a [RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) class
because it needs access to the `RuntimeContext`.
+
+{% highlight java %}
+
+// extend a RichFunction to have access to the RuntimeContext
+public final class MyMapper extends RichMapFunction<String, Integer> {
+    
+    @Override
+    public void open(Configuration config) {
+      
+      // access cached file via RuntimeContext and DistributedCache
+      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
+      // read the file (or navigate the directory)
+      ...
+    }
+
+    @Override
+    public Integer map(String value) throws Exception {
+      // use content of cached file
+      ...
+    }
+}
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+Register the file or directory in the `ExecutionEnvironment`.
+
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// register a file from HDFS
+env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
+
+// register a local executable file (script, executable, ...)
+env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
+
+// define your program and execute
+...
+val input: DataSet[String] = ...
+val result: DataSet[Integer] = input.map(new MyMapper())
+...
+env.execute()
+{% endhighlight %}
+
+Access the cached file in a user function (here a `MapFunction`). The function must extend
a [RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) class because it
needs access to the `RuntimeContext`.
+
+{% highlight scala %}
+
+// extend a RichFunction to have access to the RuntimeContext
+class MyMapper extends RichMapFunction[String, Int] {
+
+  override def open(config: Configuration): Unit = {
+
+    // access cached file via RuntimeContext and DistributedCache
+    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
+    // read the file (or navigate the directory)
+    ...
+  }
+
+  override def map(value: String): Int = {
+    // use content of cached file
+    ...
+  }
+}
+{% endhighlight %}
+
+</div>
+</div>
+
+{% top %}
+
 Passing Parameters to Functions
 -------------------
 
@@ -2067,7 +2171,7 @@ class MyFilter(limit: Int) extends FilterFunction[Int] {
 
 #### Via `withParameters(Configuration)`
 
-This method takes a Configuration object as an argument, which will be passed to the [rich
function](#rich-functions)'s `open()`
+This method takes a Configuration object as an argument, which will be passed to the [rich
function]({{ site.baseurl }}/apis/common/index.html#rich-functions)'s `open()`
 method. The Configuration object is a Map from String keys to different value types.
 
 <div class="codetabs" markdown="1">


Mime
View raw message