flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject incubator-flink git commit: [FLINK-1230] Add documentation and an example for collection-based execution
Date Thu, 13 Nov 2014 09:10:57 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/release-0.7 44b24ae28 -> bd305593d


[FLINK-1230] Add documentation and an example for collection-based execution

This closes #195


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bd305593
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bd305593
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bd305593

Branch: refs/heads/release-0.7
Commit: bd305593d4c1166c7db15626ea676e1d66227b41
Parents: 44b24ae
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Nov 11 15:49:01 2014 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Thu Nov 13 10:10:11 2014 +0100

----------------------------------------------------------------------
 docs/local_execution.md                         |  79 +++++++-------
 .../java/misc/CollectionExecutionExample.java   | 103 +++++++++++++++++++
 2 files changed, 138 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd305593/docs/local_execution.md
----------------------------------------------------------------------
diff --git a/docs/local_execution.md b/docs/local_execution.md
index 48d9ae7..e0aa22a 100644
--- a/docs/local_execution.md
+++ b/docs/local_execution.md
@@ -2,20 +2,22 @@
 title:  "Local Execution"
 ---
 
-## Local Execution/Debugging
+## Local Execution
 
 Flink can run on a single machine, even in a single Java Virtual Machine. This allows users
to test and debug Flink programs locally. This section gives an overview of the local execution
mechanisms.
 
-**NOTE:** Please also refer to the [debugging section](java_api_guide.html#debugging) in
the Java API documentation for a guide to testing and local debugging utilities in the Java
API.
+The local environments and executors allow you to run Flink programs in a local Java Virtual
Machine, or with within any JVM as part of existing programs. Most examples can be launched
locally by simply hitting the "Run" button of your IDE.
 
-The local environments and executors allow you to run Flink programs in local Java Virtual
Machine, or with within any JVM as part of existing programs. Most examples can be launched
locally by simply hitting the "Run" button of your IDE.
 
-If you are running Flink programs locally, you can also debug your program like any other
Java program. You can either use `System.out.println()` to write out some internal variables
or you can use the debugger. It is possible to set breakpoints within `map()`, `reduce()`
and all the other methods.
+There are two different kinds of local execution supported in Flink. The `LocalExecutionEnvironment`
is starting the full Flink runtime, including a JobManager and a TaskManager. These include
memory management and all the internal algorithms that are executed in the cluster mode.
 
-The `JobExecutionResult` object, which is returned after the execution finished, contains
the program runtime and the accumulator results.
+The `CollectionEnvironment` is executing the Flink program on Java collections. This mode
will not start the full Flink runtime, so the execution is very low-overhead and lightweight.
For example a `DataSet.map()`-transformation will be executed by applying the `map()` function
to all elements in a Java list.
 
-*Note:* The local execution environments do not start any web frontend to monitor the execution.
 
+## Debugging
+
+If you are running Flink programs locally, you can also debug your program like any other
Java program. You can either use `System.out.println()` to write out some internal variables
or you can use the debugger. It is possible to set breakpoints within `map()`, `reduce()`
and all the other methods.
+Please also refer to the [debugging section](programming_guide.html#debugging) in the Java
API documentation for a guide to testing and local debugging utilities in the Java API.
 
 ## Maven Dependency
 
@@ -51,56 +53,45 @@ public static void main(String[] args) throws Exception {
         })
         .writeAsText("file:///path/to/result");
 
-    env.execute();
+    JobExecutionResult res = env.execute();
 }
 ~~~
 
+The `JobExecutionResult` object, which is returned after the execution finished, contains
the program runtime and the accumulator results.
 
-## Local Executor
-
-The *LocalExecutor* is similar to the local environment, but it takes a *Plan* object, which
describes the program as a single executable unit. The *LocalExecutor* is typically used with
the Scala API. 
-
-The following code shows how you would use the `LocalExecutor` with the Wordcount example
for Scala Programs:
+*Note:* The local execution environments do not start any web frontend to monitor the execution.
 
-~~~scala
-public static void main(String[] args) throws Exception {
-    val input = TextFile("hdfs://path/to/file")
+## Collection Environment
 
-    val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } }
-    val counts = words groupBy { x => x } count()
+The execution on Java Collections using the `CollectionEnvironment` is a low-overhead approach
for executing Flink programs. Typical use-cases for this mode are automated tests, debugging
and code re-use.
 
-    val output = counts.write(wordsOutput, CsvOutputFormat())
-  
-    val plan = new ScalaPlan(Seq(output), "Word Count")
-    LocalExecutor.executePlan(p);
-}
-~~~
+Users can use algorithms implemented for batch processing also for cases that are more interactive.
A slightly changed variant of a Flink program could be used in a Java Application Server for
processing incoming requests.
 
-
-## LocalDistributedExecutor
-
-Flink also offers a `LocalDistributedExecutor` which starts multiple TaskManagers within
one JVM. The standard `LocalExecutor` starts one JobManager and one TaskManager in one JVM.
-With the `LocalDistributedExecutor` you can define the number of TaskManagers to start. This
is useful for debugging network related code and more of a developer tool than a user tool.
+**Skeleton for Collection-based execution**
 
 ~~~java
 public static void main(String[] args) throws Exception {
-    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-    DataSet<String> data = env.readTextFile("hdfs://path/to/file");
-
-    data
-        .filter(new FilterFunction<String>() {
-            public boolean filter(String value) {
-                return value.startsWith("http://");
-            }
-        })
-        .writeAsText("hdfs://path/to/result");
-
-    Plan p = env.createProgramPlan();
-    LocalDistributedExecutor lde = new LocalDistributedExecutor();
-    lde.startNephele(2); // start two TaskManagers
-    lde.run(p);
+    // initialize a new Collection-based execution environment
+    final ExecutionEnvironment env = new CollectionEnvironment();
+    
+    DataSet<User> users = env.fromCollection( /* get elements from a Java Collection
*/);
+
+    /* Data Set transformations ... */
+
+    // retrieve the resulting Tuple2 elements into a ArrayList.
+    Collection<...> result = new ArrayList<...>();
+    resultDataSet.output(new LocalCollectionOutputFormat<...>(result));
+    
+    // kick off execution.
+    env.execute();
+    
+    // Do some work with the resulting ArrayList (=Collection).
+    for(... t : result) {
+        System.err.println("Result = "+t);
+    }
 }
 ~~~
 
+The `flink-java-examples` module contains a full example, called `CollectionExecutionExample`.
 
+Please note that the execution of the collection-based Flink programs is only possible on
small data, which fits into the JVM heap. The execution on collections is not multi-threaded,
only one thread is used.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd305593/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
new file mode 100644
index 0000000..ff1a413
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.misc;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** 
+ * This example shows how to use the collection based execution of Flink.
+ * 
+ * The collection based execution is a local mode that is not using the full Flink runtime.
+ * DataSet transformations are executed on Java collections.
+ * 
+ * See the "Local Execution" section in the documentation for more details: 
+ * 	http://flink.incubator.apache.org/docs/0.7-incubating/local_execution.html
+ * 
+ */
+public class CollectionExecutionExample {
+	
+	/**
+	 * POJO class representing a user
+	 */
+	public static class User {
+		public int userIdentifier;
+		public String name;
+		public User() {}
+		public User(int userIdentifier, String name) {
+			this.userIdentifier = userIdentifier; this.name = name;
+		}
+		public String toString() {
+			return "User{userIdentifier="+userIdentifier+" name="+name+"}";
+		}
+	}
+	
+	/**
+	 * POJO for an EMail.
+	 */
+	public static class EMail {
+		public int userId;
+		public String subject;
+		public String body;
+		public EMail() {}
+		public EMail(int userId, String subject, String body) {
+			this.userId = userId; this.subject = subject; this.body = body;
+		}
+		public String toString() {
+			return "eMail{userId="+userId+" subject="+subject+" body="+body+"}";
+		}
+		
+	}
+	public static void main(String[] args) throws Exception {
+		// initialize a new Collection-based execution environment
+		final ExecutionEnvironment env = new CollectionEnvironment();
+		
+		// create objects for users and emails
+		User[] usersArray = { new User(1, "Peter"), new User(2, "John"), new User(3, "Bill") };
+		EMail[] emailsArray = {new EMail(1, "Re: Meeting", "How about 1pm?"),
+							new EMail(1, "Re: Meeting", "Sorry, I'm not availble"),
+							new EMail(3, "Re: Re: Project proposal", "Give me a few more days to think about it.")};
+		
+		// convert objects into a DataSet
+		DataSet<User> users = env.fromCollection(Arrays.asList(usersArray));
+		DataSet<EMail> emails = env.fromCollection(Arrays.asList(emailsArray));
+		
+		// join the two DataSets
+		DataSet<Tuple2<User,EMail>> joined = users.join(emails).where("userIdentifier").equalTo("userId");
+		
+		// retrieve the resulting Tuple2 elements into a ArrayList.
+		Collection<Tuple2<User,EMail>> result = new ArrayList<Tuple2<User,EMail>>(3);
+		joined.output(new LocalCollectionOutputFormat<Tuple2<User,EMail>>(result));
+		
+		// kick off execution.
+		env.execute();
+		
+		// Do some work with the resulting ArrayList (=Collection).
+		for(Tuple2<User, EMail> t : result) {
+			System.err.println("Result = "+t);
+		}
+	}
+}


Mime
View raw message