flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [4/5] flink git commit: Added support for Apache Tez as an execution environment
Date Thu, 02 Apr 2015 18:08:31 GMT
Added support for Apache Tez as an execution environment

This closes #189


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cbbd328
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cbbd328
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cbbd328

Branch: refs/heads/master
Commit: 2cbbd3287252fd33472ae52cfa6a6b38b3b1c39e
Parents: 2ffefdf
Author: Kostas Tzoumas <kostas.tzoumas@gmail.com>
Authored: Tue Jan 6 15:01:03 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Thu Apr 2 20:07:58 2015 +0200

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 docs/_includes/sidenav.html                     |   3 +-
 docs/flink_on_tez_guide.md                      | 292 ++++++++++
 docs/img/flink_on_tez_translation.png           | Bin 0 -> 350867 bytes
 docs/img/flink_tez_vertex.png                   | Bin 0 -> 105544 bytes
 .../scala/graph/TransitiveClosureNaive.scala    |   2 +-
 flink-quickstart/flink-tez-quickstart/pom.xml   |  37 ++
 .../src/main/java/Dummy.java                    |  28 +
 .../META-INF/maven/archetype-metadata.xml       |  36 ++
 .../main/resources/archetype-resources/pom.xml  | 186 ++++++
 .../src/assembly/flink-fat-jar.xml              |  40 ++
 .../src/main/java/Driver.java                   | 115 ++++
 .../src/main/java/LocalJob.java                 |  72 +++
 .../src/main/java/LocalWordCount.java           |  96 ++++
 .../src/main/java/YarnJob.java                  |  75 +++
 .../src/main/java/YarnWordCount.java            | 124 ++++
 .../projects/testArtifact/archetype.properties  |  21 +
 .../resources/projects/testArtifact/goal.txt    |   1 +
 flink-quickstart/pom.xml                        |  10 +
 .../runtime/operators/util/TaskConfig.java      |   3 +-
 flink-staging/flink-tez/pom.xml                 | 224 ++++++++
 .../flink-tez/src/assembly/flink-fat-jar.xml    |  42 ++
 .../flink/tez/client/LocalTezEnvironment.java   |  71 +++
 .../flink/tez/client/RemoteTezEnvironment.java  |  78 +++
 .../apache/flink/tez/client/TezExecutor.java    | 198 +++++++
 .../flink/tez/client/TezExecutorTool.java       |  80 +++
 .../flink/tez/dag/FlinkBroadcastEdge.java       |  70 +++
 .../flink/tez/dag/FlinkDataSinkVertex.java      |  61 ++
 .../flink/tez/dag/FlinkDataSourceVertex.java    |  82 +++
 .../org/apache/flink/tez/dag/FlinkEdge.java     |  45 ++
 .../apache/flink/tez/dag/FlinkForwardEdge.java  |  71 +++
 .../flink/tez/dag/FlinkPartitionEdge.java       |  71 +++
 .../flink/tez/dag/FlinkProcessorVertex.java     |  61 ++
 .../apache/flink/tez/dag/FlinkUnionVertex.java  |  61 ++
 .../org/apache/flink/tez/dag/FlinkVertex.java   | 114 ++++
 .../apache/flink/tez/dag/TezDAGGenerator.java   | 460 +++++++++++++++
 .../tez/examples/ConnectedComponentsStep.java   | 203 +++++++
 .../flink/tez/examples/ExampleDriver.java       | 119 ++++
 .../flink/tez/examples/PageRankBasicStep.java   | 241 ++++++++
 .../apache/flink/tez/examples/TPCHQuery3.java   | 224 ++++++++
 .../examples/TransitiveClosureNaiveStep.java    | 135 +++++
 .../apache/flink/tez/examples/WordCount.java    | 129 +++++
 .../flink/tez/runtime/DataSinkProcessor.java    | 228 ++++++++
 .../flink/tez/runtime/DataSourceProcessor.java  | 190 +++++++
 .../flink/tez/runtime/RegularProcessor.java     | 128 +++++
 .../tez/runtime/TezRuntimeEnvironment.java      |  50 ++
 .../org/apache/flink/tez/runtime/TezTask.java   | 570 +++++++++++++++++++
 .../apache/flink/tez/runtime/TezTaskConfig.java | 163 ++++++
 .../flink/tez/runtime/UnionProcessor.java       | 106 ++++
 .../flink/tez/runtime/input/FlinkInput.java     | 139 +++++
 .../runtime/input/FlinkInputSplitGenerator.java |  94 +++
 .../tez/runtime/input/TezReaderIterator.java    |  66 +++
 .../tez/runtime/output/SimplePartitioner.java   |  35 ++
 .../tez/runtime/output/TezChannelSelector.java  |  36 ++
 .../tez/runtime/output/TezOutputCollector.java  |  72 +++
 .../tez/runtime/output/TezOutputEmitter.java    | 190 +++++++
 .../apache/flink/tez/util/DummyInvokable.java   |  51 ++
 .../apache/flink/tez/util/EncodingUtils.java    |  64 +++
 .../flink/tez/util/FlinkSerialization.java      | 310 ++++++++++
 .../src/main/resources/log4j.properties         |  30 +
 .../tez/test/ConnectedComponentsStepITCase.java |  83 +++
 .../flink/tez/test/PageRankBasicStepITCase.java |  54 ++
 .../flink/tez/test/TezProgramTestBase.java      | 104 ++++
 .../flink/tez/test/WebLogAnalysisITCase.java    |  48 ++
 .../apache/flink/tez/test/WordCountITCase.java  |  47 ++
 .../src/test/resources/log4j-test.properties    |  30 +
 .../src/test/resources/logback-test.xml         |  37 ++
 flink-staging/pom.xml                           |  12 +-
 .../flink/test/testdata/WebLogAnalysisData.java | 149 +++++
 .../flink/test/testdata/WebLogAnalysisData.java | 149 -----
 pom.xml                                         | 111 ++--
 71 files changed, 7119 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index bf3e0b4..2e6a041 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -23,7 +23,7 @@ matrix:
     - jdk: "openjdk6" # we must use openjdk6 here to deploy a java6 compatible uber-jar for YARN
       env: PROFILE="-Dhadoop.version=2.2.0"
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.6.0 -Dscala-2.11 -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.6.0 -Dscala-2.11 -Pinclude-tez -Dmaven.javadoc.skip=true"
 
 
 git:

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/docs/_includes/sidenav.html
----------------------------------------------------------------------
diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html
index 66243fd..37bba25 100644
--- a/docs/_includes/sidenav.html
+++ b/docs/_includes/sidenav.html
@@ -41,7 +41,8 @@ under the License.
   <li><div class="sidenav-item"><a href="{{ site.baseurl }}/spargel_guide.html">Spargel Graph API</a></div></li>
   <li><div class="sidenav-item"><a href="{{ site.baseurl }}/gelly_guide.html">Gelly Graph API</a></div></li>
   <li><div class="sidenav-item"><a href="{{ site.baseurl }}/linq.html">Language-Integrated Queries</a></div></li>
-  <li><div class="sidenav-item-bottom"><a href="{{ site.baseurl }}/hadoop_compatibility.html">Hadoop Compatibility</a></div></li>
+  <li><div class="sidenav-item"><a href="{{ site.baseurl }}/hadoop_compatibility.html">Hadoop Compatibility</a></div></li>
+    <li><div class="sidenav-item-bottom"><a href="{{ site.baseurl }}/flink_on_tez_guide.html">Running Flink on Tez</a></div></li>
 
   <li><div class="sidenav-category">Examples</div></li>
   <li><div class="sidenav-item"><a href="{{ site.baseurl }}/examples.html">Bundled Examples</a></div></li>

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/docs/flink_on_tez_guide.md
----------------------------------------------------------------------
diff --git a/docs/flink_on_tez_guide.md b/docs/flink_on_tez_guide.md
new file mode 100644
index 0000000..08aed28
--- /dev/null
+++ b/docs/flink_on_tez_guide.md
@@ -0,0 +1,292 @@
+---
+title: "Running Flink on YARN leveraging Tez"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+
+<a href="#top"></a>
+
+## Introduction
+
+You can run Flink using Tez as an execution environment. Flink on Tez 
+is currently included in *flink-staging* in alpha. All classes are
+localted in the *org.apache.flink.tez* package.
+
+## Why Flink on Tez
+
+[Apache Tez](tez.apache.org) is a scalable data processing
+platform. Tez provides an API for specifying a directed acyclic
+graph (DAG), and functionality for placing the DAG vertices in YARN
+containers, as well as data shuffling.  In Flink's architecture,
+Tez is at about the same level as Flink's network stack. While Flink's
+network stack focuses heavily on low latency in order to support 
+pipelining, data streaming, and iterative algorithms, Tez
+focuses on scalability and elastic resource usage.
+
+Thus, by replacing Flink's network stack with Tez, users can get scalability
+and elastic resource usage in shared clusters while retaining Flink's 
+APIs, optimizer, and runtime algorithms (local sorts, hash tables, etc).
+
+Flink programs can run almost unmodified using Tez as an execution
+environment. Tez supports local execution (e.g., for debugging), and 
+remote execution on YARN.
+
+
+## Local execution
+
+The `LocalTezEnvironment` can be used run programs using the local
+mode provided by Tez. This is for example WordCount using Tez local mode.
+It is identical to a normal Flink WordCount, except that the `LocalTezEnvironment` is used.
+To run in local Tez mode, you can simply run a Flink on Tez program
+from your IDE (e.g., right click and run).
+  
+{% highlight java %}
+public class WordCountExample {
+    public static void main(String[] args) throws Exception {
+        final LocalTezEnvironment env = LocalTezEnvironment.create();
+
+	    DataSet<String> text = env.fromElements(
+            "Who's there?",
+            "I think I hear them. Stand, ho! Who's there?");
+
+        DataSet<Tuple2<String, Integer>> wordCounts = text
+            .flatMap(new LineSplitter())
+            .groupBy(0)
+            .sum(1);
+
+        wordCounts.print();
+
+        env.execute("Word Count Example");
+    }
+
+    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
+        @Override
+        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
+            for (String word : line.split(" ")) {
+                out.collect(new Tuple2<String, Integer>(word, 1));
+            }
+        }
+    }
+}
+{% endhighlight %}
+
+## YARN execution
+
+### Setup
+
+- Install Tez on your Hadoop 2 cluster following the instructions from the
+  [Apache Tez website](http://tez.apache.org/install.html). If you are able to run 
+  the examples that ship with Tez, then Tez has been successfully installed.
+  
+- Currently, you need to build Flink yourself to obtain Flink on Tez
+  (the reason is a Hadoop version compatibility: Tez releases artifacts
+  on Maven central with a Hadoop 2.6.0 dependency). Build Flink
+  using `mvn -DskipTests clean package -Pinclude-tez -Dhadoop.version=X.X.X -Dtez.version=X.X.X`.
+  Make sure that the Hadoop version matches the version that Tez uses.
+  Obtain the jar file contained in the Flink distribution under
+  `flink-staging/flink-tez/target/flink-tez-x.y.z-flink-fat-jar.jar` 
+  and upload it to some directory in HDFS. E.g., to upload the file
+  to the directory `/apps`, execute
+  {% highlight bash %}
+  $ hadoop fs -put /path/to/flink-tez-x.y.z-flink-fat-jar.jar /apps
+  {% endhighlight %}  
+ 
+- Edit the tez-site.xml configuration file, adding an entry that points to the
+  location of the file. E.g., assuming that the file is in the directory `/apps/`, 
+  add the following entry to tez-site.xml
+    ~~~<property>
+      <name>tez.aux.uris</name>
+      <value>${fs.default.name}/apps/flink-tez-x.y.z-flink-fat-jar.jar</value>
+    </property>
+    ~~~
+    
+- At this point, you should be able to run the pre-packaged examples, e.g., run WordCount as:
+  {% highlight bash %}
+  $ hadoop jar /path/to/flink-tez-x.y.z-flink-fat-jar.jar wc hdfs:/path/to/text hdfs:/path/to/output
+  {% endhighlight %}  
+
+
+### Packaging your program
+
+Application packaging is currently a bit different than in Flink standalone mode.
+  Flink programs that run on Tez need to be packaged in a "fat jar"
+  file that contain the Flink client and executed via the `hadoop jar` command.
+  An easy way to do that is to use the provided `flink-tez-quickstart` maven archetype.
+  Create a new project as
+  
+  {% highlight bash %}
+  $ mvn archetype:generate                             \
+    -DarchetypeGroupId=org.apache.flink              \
+    -DarchetypeArtifactId=flink-tez-quickstart           \
+    -DarchetypeVersion={{site.FLINK_VERSION_SHORT}}
+  {% endhighlight %}
+  
+  and specify the group id, artifact id, version, and package of your project. For example,
+  let us assume the following options: `org.myorganization`, `flink-on-tez`, `0.1`, and `org.myorganization`.
+  You should see the following output on your terminal:
+  
+  {% highlight bash %}
+  $ mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-tez-quickstart
+  [INFO] Scanning for projects...
+  [INFO]
+  [INFO] ------------------------------------------------------------------------
+  [INFO] Building Maven Stub Project (No POM) 1
+  [INFO] ------------------------------------------------------------------------
+  [INFO]
+  [INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) > generate-sources @ standalone-pom >>>
+  [INFO]
+  [INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) < generate-sources @ standalone-pom <<<
+  [INFO]
+  [INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom ---
+  [INFO] Generating project in Interactive mode
+  [INFO] Archetype [org.apache.flink:flink-tez-quickstart:0.9-SNAPSHOT] found in catalog local
+  Define value for property 'groupId': : org.myorganization
+  Define value for property 'artifactId': : flink-on-tez
+  Define value for property 'version':  1.0-SNAPSHOT: : 0.1
+  Define value for property 'package':  org.myorganization: :
+  Confirm properties configuration:
+  groupId: org.myorganization
+  artifactId: flink-on-tez
+  version: 0.1
+  package: org.myorganization
+   Y: : Y
+  [INFO] ----------------------------------------------------------------------------
+  [INFO] Using following parameters for creating project from Archetype: flink-tez-quickstart:0.9-SNAPSHOT
+  [INFO] ----------------------------------------------------------------------------
+  [INFO] Parameter: groupId, Value: org.myorganization
+  [INFO] Parameter: artifactId, Value: flink-on-tez
+  [INFO] Parameter: version, Value: 0.1
+  [INFO] Parameter: package, Value: org.myorganization
+  [INFO] Parameter: packageInPathFormat, Value: org/myorganization
+  [INFO] Parameter: package, Value: org.myorganization
+  [INFO] Parameter: version, Value: 0.1
+  [INFO] Parameter: groupId, Value: org.myorganization
+  [INFO] Parameter: artifactId, Value: flink-on-tez
+  [INFO] project created from Archetype in dir: /Users/kostas/Dropbox/flink-tez-quickstart-test/flink-on-tez
+  [INFO] ------------------------------------------------------------------------
+  [INFO] BUILD SUCCESS
+  [INFO] ------------------------------------------------------------------------
+  [INFO] Total time: 44.130 s
+  [INFO] Finished at: 2015-02-26T17:59:45+01:00
+  [INFO] Final Memory: 15M/309M
+  [INFO] ------------------------------------------------------------------------
+  {% endhighlight %}
+  
+  The project contains an example called `YarnJob.java` that provides the skeleton 
+  for a Flink-on-Tez job. Programs execution is done currently via Hadoop's `ProgramDriver`, 
+  see the `Driver.java` class for an example. Create the fat jar using 
+  `mvn -DskipTests clean package`. The resulting jar will be located in the `target/` directory. 
+  You can now execute a job as follows:
+  
+  {% highlight bash %}
+$ mvn -DskipTests clean package
+$ hadoop jar flink-on-tez/target/flink-on-tez-0.1-flink-fat-jar.jar yarnjob [command-line parameters]
+  {% endhighlight %}
+  
+  Flink programs that run on YARN using Tez as an execution engine need to use the `RemoteTezEnvironment` and 
+  register the class that contains the `main` method with that environment:
+  {% highlight java %}
+  public class WordCountExample {
+      public static void main(String[] args) throws Exception {
+          final RemoteTezEnvironment env = RemoteTezEnvironment.create();
+  
+  	    DataSet<String> text = env.fromElements(
+              "Who's there?",
+              "I think I hear them. Stand, ho! Who's there?");
+  
+          DataSet<Tuple2<String, Integer>> wordCounts = text
+              .flatMap(new LineSplitter())
+              .groupBy(0)
+              .sum(1);
+  
+          wordCounts.print();
+      
+          env.registerMainClass(WordCountExample.class);
+          env.execute("Word Count Example");
+      }
+  
+      public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
+          @Override
+          public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
+              for (String word : line.split(" ")) {
+                  out.collect(new Tuple2<String, Integer>(word, 1));
+              }
+          }
+      }
+  }
+  {% endhighlight %}
+
+
+## How it works
+
+Flink on Tez reuses the Flink APIs, the Flink optimizer,
+and the Flink local runtime, including Flink's hash table and sort implementations. Tez
+replaces Flink's network stack and control plan, and is responsible for scheduling and
+network shuffles.
+
+The figure below shows how a Flink program passes through the Flink stack and generates
+a Tez DAG (instead of a JobGraph that would be created using normal Flink execution).
+
+<div style="text-align: center;">
+<img src="img/flink_on_tez_translation.png" alt="Translation of a Flink program to a Tez DAG." height="600px" vspace="20px" style="text-align: center;"/>
+</div>
+
+All local processing, including memory management, sorting, and hashing is performed by
+Flink as usual. Local processing is encapsulated in Tez vertices, as seen in the figure
+below. Tez vertices are connected by edges. Tez is currently based on a key-value data
+model. In the current implementation, the elements that are processed by Flink operators
+are wrapped inside Tez values, and the Tez key field is used to indicate the index of the target task
+that the elements are destined to.
+
+<div style="text-align: center;">
+<img src="img/flink_tez_vertex.png" alt="Encapsulation of Flink runtime inside Tez vertices." height="200px" vspace="20px" style="text-align: center;"/>
+</div>
+
+## Limitations
+
+Currently, Flink on Tez does not support all features of the Flink API. We are working
+to enable all of the missing features listed below. In the meantime, if your project depends on these features, we suggest
+to use [Flink on YARN]({{site.baseurl}}/yarn_setup.html) or [Flink standalone]({{site.baseurl}}/setup_quickstart.html).
+
+The following features are currently missing.
+
+- Dedicated client: jobs need to be submitted via Hadoop's command-line client
+
+- Self-joins: currently binary operators that receive the same input are not supported due to 
+  [TEZ-1190](https://issues.apache.org/jira/browse/TEZ-1190).
+
+- Iterative programs are currently not supported.
+
+- Broadcast variables are currently not supported.
+
+- Accummulators and counters are currently not supported.
+
+- Performance: The current implementation has not been heavily tested for performance, and misses several optimizations,
+  including task chaining.
+
+- Streaming API: Streaming programs will not currently compile to Tez DAGs.
+
+- Scala API: The current implementation has only been tested with the Java API.
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/docs/img/flink_on_tez_translation.png
----------------------------------------------------------------------
diff --git a/docs/img/flink_on_tez_translation.png b/docs/img/flink_on_tez_translation.png
new file mode 100644
index 0000000..88fa4d5
Binary files /dev/null and b/docs/img/flink_on_tez_translation.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/docs/img/flink_tez_vertex.png
----------------------------------------------------------------------
diff --git a/docs/img/flink_tez_vertex.png b/docs/img/flink_tez_vertex.png
new file mode 100644
index 0000000..b469862
Binary files /dev/null and b/docs/img/flink_tez_vertex.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index d171611..727cb47 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
 import org.apache.flink.util.Collector
 
-object TransitiveClosureNaive {
+object  TransitiveClosureNaive {
 
   def main (args: Array[String]): Unit = {
     if (!parseParameters(args)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/pom.xml b/flink-quickstart/flink-tez-quickstart/pom.xml
new file mode 100644
index 0000000..3e41e54
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/pom.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-quickstart</artifactId>
+        <version>0.9-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-tez-quickstart</artifactId>
+    <packaging>maven-archetype</packaging>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/java/Dummy.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/java/Dummy.java b/flink-quickstart/flink-tez-quickstart/src/main/java/Dummy.java
new file mode 100644
index 0000000..c7749ff
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/java/Dummy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstart;
+
+/**
+ * This class solely exists to generate
+ * javadocs for the "quickstart-java" project.
+ **/
+public class Dummy {
+	//
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/META-INF/maven/archetype-metadata.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-quickstart/flink-tez-quickstart/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..adb3b83
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<archetype-descriptor
+		xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+		name="flink-tez-quickstart">
+	<fileSets>
+		<fileSet filtered="true" packaged="true" encoding="UTF-8">
+			<directory>src/main/java</directory>
+			<includes>
+				<include>**/*.java</include>
+			</includes>
+		</fileSet>
+		<fileSet encoding="UTF-8">
+			<directory>src/assembly</directory>
+		</fileSet>
+	</fileSets>
+</archetype-descriptor>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..180077d
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,186 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>${groupId}</groupId>
+    <artifactId>${artifactId}</artifactId>
+    <version>${version}</version>
+    <packaging>jar</packaging>
+
+    <name>Your Job's Name</name>
+    <url>http://www.myorganization.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <repositories>
+        <repository>
+            <id>apache.snapshots</id>
+            <name>Apache Development Snapshot Repository</name>
+            <url>https://repository.apache.org/content/repositories/snapshots/</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+    <!-- These two requirements are the minimum to use and develop Flink.
+        You can add others like <artifactId>flink-scala</artifactId> for Scala! -->
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>0.9-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>0.9-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tez</artifactId>
+            <version>0.9-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <!-- We use the maven-assembly plugin to create a fat jar that contains all dependencies
+            except flink and it's transitive dependencies. The resulting fat-jar can be executed
+            on a cluster. Change the value of Program-Class if your program entry point changes. -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4.1</version>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/assembly/flink-fat-jar.xml</descriptor>
+                    </descriptors>
+                    <archive>
+                        <manifest>
+                            <mainClass>${package}.Driver</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <!-- Configure the jar plugin to add the main class as a manifest entry -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifestEntries>
+                            <Main-Class>${package}.Driver</Main-Class>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.6</source> <!-- If you want to use Java 8, change this to "1.8" -->
+                    <target>1.6</target> <!-- If you want to use Java 8, change this to "1.8" -->
+                </configuration>
+            </plugin>
+        </plugins>
+
+
+        <!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
+        <!--
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <configuration>
+                        <source>1.8</source>
+                        <target>1.8</target>
+                        <compilerId>jdt</compilerId>
+                    </configuration>
+                    <dependencies>
+                        <dependency>
+                            <groupId>org.eclipse.tycho</groupId>
+                            <artifactId>tycho-compiler-jdt</artifactId>
+                            <version>0.21.0</version>
+                        </dependency>
+                    </dependencies>
+                </plugin>
+
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-assembly-plugin</artifactId>
+                                        <versionRange>[2.4,)</versionRange>
+                                        <goals>
+                                            <goal>single</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-compiler-plugin</artifactId>
+                                        <versionRange>[3.1,)</versionRange>
+                                        <goals>
+                                            <goal>testCompile</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+        -->
+
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/assembly/flink-fat-jar.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/assembly/flink-fat-jar.xml b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/assembly/flink-fat-jar.xml
new file mode 100644
index 0000000..f9f27e5
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/assembly/flink-fat-jar.xml
@@ -0,0 +1,40 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+	<id>flink-fat-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>/</outputDirectory>
+			<useProjectArtifact>true</useProjectArtifact>
+			<excludes>
+				<exclude>org.apache.tez:*</exclude>
+                <exclude>org.apache.hadoop:*</exclude>
+			</excludes>
+			<useTransitiveFiltering>true</useTransitiveFiltering>
+			<unpack>true</unpack>
+			<scope>runtime</scope>
+		</dependencySet>
+	</dependencySets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/Driver.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/Driver.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/Driver.java
new file mode 100644
index 0000000..5431849
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/Driver.java
@@ -0,0 +1,115 @@
+#set($hash = '#')
+
+package ${package};
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.util.ProgramDriver;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.EnumSet;
+import java.util.Set;
+
+public class Driver {
+
+	private static final DecimalFormat formatter = new DecimalFormat("${hash}${hash}${hash}.${hash}${hash}%");
+
+	public static void main(String [] args){
+		int exitCode = -1;
+		ProgramDriver pgd = new ProgramDriver();
+		try {
+			pgd.addClass("yarnjob", YarnJob.class,
+					"Yarn Job");
+			pgd.addClass("wc", YarnWordCount.class,
+					"Word Count");
+			exitCode = pgd.run(args);
+		} catch(Throwable e){
+			e.printStackTrace();
+		}
+		System.exit(exitCode);
+	}
+
+	public static void printDAGStatus(DAGClient dagClient, String[] vertexNames)
+			throws IOException, TezException {
+		printDAGStatus(dagClient, vertexNames, false, false);
+	}
+
+	public static void printDAGStatus(DAGClient dagClient, String[] vertexNames, boolean displayDAGCounters, boolean displayVertexCounters)
+			throws IOException, TezException {
+		Set<StatusGetOpts> opts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+		DAGStatus dagStatus = dagClient.getDAGStatus(
+				(displayDAGCounters ? opts : null));
+		Progress progress = dagStatus.getDAGProgress();
+		double vProgressFloat = 0.0f;
+		if (progress != null) {
+			System.out.println("");
+			System.out.println("DAG: State: "
+					+ dagStatus.getState()
+					+ " Progress: "
+					+ (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) :
+					formatter.format((double)(progress.getSucceededTaskCount())
+							/progress.getTotalTaskCount())));
+			for (String vertexName : vertexNames) {
+				VertexStatus vStatus = dagClient.getVertexStatus(vertexName,
+						(displayVertexCounters ? opts : null));
+				if (vStatus == null) {
+					System.out.println("Could not retrieve status for vertex: "
+							+ vertexName);
+					continue;
+				}
+				Progress vProgress = vStatus.getProgress();
+				if (vProgress != null) {
+					vProgressFloat = 0.0f;
+					if (vProgress.getTotalTaskCount() == 0) {
+						vProgressFloat = 1.0f;
+					} else if (vProgress.getTotalTaskCount() > 0) {
+						vProgressFloat = (double)vProgress.getSucceededTaskCount()
+								/vProgress.getTotalTaskCount();
+					}
+					System.out.println("VertexStatus:"
+							+ " VertexName: "
+							+ (vertexName.equals("ivertex1") ? "intermediate-reducer"
+							: vertexName)
+							+ " Progress: " + formatter.format(vProgressFloat));
+				}
+				if (displayVertexCounters) {
+					TezCounters counters = vStatus.getVertexCounters();
+					if (counters != null) {
+						System.out.println("Vertex Counters for " + vertexName + ": "
+								+ counters);
+					}
+				}
+			}
+		}
+		if (displayDAGCounters) {
+			TezCounters counters = dagStatus.getDAGCounters();
+			if (counters != null) {
+				System.out.println("DAG Counters: " + counters);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
new file mode 100644
index 0000000..cf7474e
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
@@ -0,0 +1,72 @@
+package ${package};
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.tez.client.LocalTezEnvironment;
+
+/**
+ * Skeleton for a Flink on Tez Job running using Tez local mode.
+ *
+ * For a full example of a Flink  on TezJob, see the WordCountJob.java file in the
+ * same package/directory or have a look at the website.
+ *
+ * You can also generate a .jar file that you can submit on your Flink
+ * cluster.
+ * Just type
+ * 		mvn clean package
+ * in the projects root directory.
+ * You will find the jar in
+ * 		target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
+ *
+ */
+public class LocalJob {
+
+	public static void main(String[] args) throws Exception {
+		// set up the execution environment
+
+		// To use Tez YARN execution, use
+		final LocalTezEnvironment env = LocalTezEnvironment.create();
+
+		/**
+		 * Here, you can start creating your execution plan for Flink.
+		 *
+		 * Start with getting some data from the environment, like
+		 * 	env.readTextFile(textPath);
+		 *
+		 * then, transform the resulting DataSet<String> using operations
+		 * like
+		 * 	.filter()
+		 * 	.flatMap()
+		 * 	.join()
+		 * 	.coGroup()
+		 * and many more.
+		 * Have a look at the programming guide for the Java API:
+		 *
+		 * http://flink.apache.org/docs/latest/programming_guide.html
+		 *
+		 * and the examples
+		 *
+		 * http://flink.apache.org/docs/latest/examples.html
+		 *
+		 */
+
+		// execute program
+		env.execute("Flink Java API Skeleton");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalWordCount.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalWordCount.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalWordCount.java
new file mode 100644
index 0000000..dbe81a7
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalWordCount.java
@@ -0,0 +1,96 @@
+package ${package};
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.tez.client.LocalTezEnvironment;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over some sample data
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink program.
+ * <li>use Tuple data types.
+ * <li>write and use user-defined functions.
+ * </ul>
+ *
+ */
+public class LocalWordCount {
+
+	//
+	//	Program
+	//
+
+	public static void main(String[] args) throws Exception {
+
+		// set up the execution environment
+		final LocalTezEnvironment env = LocalTezEnvironment.create();
+
+		// get input data
+		DataSet<String> text = env.fromElements(
+				"To be, or not to be,--that is the question:--",
+				"Whether 'tis nobler in the mind to suffer",
+				"The slings and arrows of outrageous fortune",
+				"Or to take arms against a sea of troubles,"
+				);
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new LineSplitter())
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.sum(1);
+
+		// emit result
+		counts.print();
+
+		env.execute("WordCount Example");
+	}
+
+	//
+	// 	User Functions
+	//
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a user-defined
+	 * FlatMapFunction. The function takes a line (String) and splits it into
+	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 */
+	public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
new file mode 100644
index 0000000..51627d5
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
@@ -0,0 +1,75 @@
+package ${package};
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.tez.client.RemoteTezEnvironment;
+
+/**
+ * Skeleton for a Flink on Tez program running on Yarn.
+ *
+ * For a full example of a Flink on Tez program, see the WordCountJob.java file in the
+ * same package/directory or have a look at the website.
+ *
+ * You can also generate a .jar file that you can submit on your Flink
+ * cluster.
+ * Just type
+ * 		mvn clean package
+ * in the projects root directory.
+ * You will find the jar in
+ * 		target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
+ *
+ */
+public class YarnJob {
+
+	public static void main(String[] args) throws Exception {
+		// set up the execution environment
+		
+		// To use Tez YARN execution, use
+		final RemoteTezEnvironment env = RemoteTezEnvironment.create();
+		env.setDegreeOfParallelism(8);
+
+		/**
+		 * Here, you can start creating your execution plan for Flink.
+		 *
+		 * Start with getting some data from the environment, like
+		 * 	env.readTextFile(textPath);
+		 *
+		 * then, transform the resulting DataSet<String> using operations
+		 * like
+		 * 	.filter()
+		 * 	.flatMap()
+		 * 	.join()
+		 * 	.coGroup()
+		 * and many more.
+		 * Have a look at the programming guide for the Java API:
+		 *
+		 * http://flink.apache.org/docs/latest/programming_guide.html
+		 *
+		 * and the examples
+		 *
+		 * http://flink.apache.org/docs/latest/examples.html
+		 *
+		 */
+		
+		
+		// execute program
+		env.registerMainClass(YarnJob.class);
+		env.execute("Flink Java API Skeleton");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
new file mode 100644
index 0000000..e97dc0b
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
@@ -0,0 +1,124 @@
+package ${package};
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+		import org.apache.flink.api.java.DataSet;
+		import org.apache.flink.tez.client.RemoteTezEnvironment;
+		import org.apache.flink.api.common.functions.FlatMapFunction;
+		import org.apache.flink.api.java.tuple.Tuple2;
+		import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over some sample data
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink program.
+ * <li>use Tuple data types.
+ * <li>write and use user-defined functions.
+ * </ul>
+ *
+ */
+public class YarnWordCount {
+
+	//
+	//	Program
+	//
+
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final RemoteTezEnvironment env = RemoteTezEnvironment.create();
+		env.setDegreeOfParallelism(parallelism);
+
+		// get input data
+		DataSet<String> text = env.readTextFile(textPath);
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new LineSplitter())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.groupBy(0)
+						.sum(1);
+
+		// emit result
+		counts.writeAsCsv(outputPath, "\n", " ");
+
+		// execute program
+		env.registerMainClass (YarnWordCount.class);
+		env.execute("WordCount Example");
+	}
+
+	//
+	// 	User Functions
+	//
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a user-defined
+	 * FlatMapFunction. The function takes a line (String) and splits it into
+	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 */
+	public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	private static int parallelism;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 3) {
+				textPath = args[0];
+				outputPath = args[1];
+				parallelism = Integer.parseInt(args[2]);
+			} else {
+				System.err.println("Usage: YarnWordCount <text path> <result path> <parallelism>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: YarnWordCount <text path> <result path> <parallelism>");
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/archetype.properties
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/archetype.properties b/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/archetype.properties
new file mode 100644
index 0000000..bfce480
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/archetype.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+groupId=org.apache.flink.archetypetest
+artifactId=testArtifact
+version=0.1
+package=org.apache.flink.archetypetest

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/goal.txt
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/goal.txt b/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/goal.txt
new file mode 100644
index 0000000..f8808ba
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/goal.txt
@@ -0,0 +1 @@
+compile
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index 4f3b4b2..dd9621c 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -74,4 +74,14 @@ under the License.
 			</plugin>
 		</plugins>
 	</build>
+
+    <!-- See main pom.xml for explanation of profiles -->
+    <profiles>
+        <profile>
+            <id>include-tez</id>
+            <modules>
+                <module>flink-tez-quickstart</module>
+            </modules>
+        </profile>
+    </profiles>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index 89cf98a..6c97097 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -56,7 +57,7 @@ import org.apache.flink.util.InstantiationUtil;
 /**
  * Configuration class which stores all relevant parameters required to set up the Pact tasks.
  */
-public class TaskConfig {
+public class TaskConfig implements Serializable {
 	
 	private static final String TASK_NAME = "taskname";
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/pom.xml b/flink-staging/flink-tez/pom.xml
new file mode 100644
index 0000000..51bc551
--- /dev/null
+++ b/flink-staging/flink-tez/pom.xml
@@ -0,0 +1,224 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-staging</artifactId>
+        <version>0.9-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-tez</artifactId>
+    <name>flink-tez</name>
+
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-optimizer</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java-examples</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-api</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-common</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-dag</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-runtime-library</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.4</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4.1</version>
+                <configuration>
+                    <descriptors>
+                        <descriptor>${basedir}/src/assembly/flink-fat-jar.xml</descriptor>
+                    </descriptors>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.flink.tez.examples.ExampleDriver</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <!--<id>assemble-all</id>-->
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml b/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml
new file mode 100644
index 0000000..504761a
--- /dev/null
+++ b/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml
@@ -0,0 +1,42 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+	<id>flink-fat-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>/</outputDirectory>
+			<useProjectArtifact>true</useProjectArtifact>
+			<!--<excludes>
+				<exclude>org.apache.flink:*</exclude>
+			</excludes>-->
+			<useTransitiveFiltering>true</useTransitiveFiltering>
+			<unpack>true</unpack>
+			<scope>runtime</scope>
+            <excludes>
+                <exclude>com.google.guava:guava</exclude>
+            </excludes>
+		</dependencySet>
+	</dependencySets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
new file mode 100644
index 0000000..6d1b1c7
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+
+public class LocalTezEnvironment extends ExecutionEnvironment {
+
+	TezExecutor executor;
+	Optimizer compiler;
+
+	private LocalTezEnvironment() {
+		compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
+		executor = new TezExecutor(compiler, this.getParallelism());
+	}
+
+	public static LocalTezEnvironment create() {
+		return new LocalTezEnvironment();
+	}
+
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		TezConfiguration tezConf = new TezConfiguration();
+		tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+		tezConf.set("fs.defaultFS", "file:///");
+		tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+		executor.setConfiguration(tezConf);
+		return executor.executePlan(createProgramPlan(jobName));
+	}
+
+	@Override
+	public String getExecutionPlan() throws Exception {
+		Plan p = createProgramPlan(null, false);
+		return executor.getOptimizerPlanAsJSON(p);
+	}
+
+	public void setAsContext() {
+		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+			@Override
+			public ExecutionEnvironment createExecutionEnvironment() {
+				return LocalTezEnvironment.this;
+			}
+		};
+		initializeContextEnvironment(factory);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
new file mode 100644
index 0000000..b155527
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ClassUtil;
+import org.apache.hadoop.util.ToolRunner;
+
+
+public class RemoteTezEnvironment extends ExecutionEnvironment {
+
+	private static final Log LOG = LogFactory.getLog(RemoteTezEnvironment.class);
+	
+	private Optimizer compiler;
+	private TezExecutor executor;
+	private Path jarPath = null;
+	
+
+	public void registerMainClass (Class mainClass) {
+		jarPath = new Path(ClassUtil.findContainingJar(mainClass));
+		LOG.info ("Registering main class " + mainClass.getName() + " contained in " + jarPath.toString());
+	}
+
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		TezExecutorTool tool = new TezExecutorTool(executor, createProgramPlan());
+		if (jarPath != null) {
+			tool.setJobJar(jarPath);
+		}
+		try {
+			int executionResult = ToolRunner.run(new Configuration(), tool, new String[]{jobName});
+		}
+		finally {
+			return new JobExecutionResult(null, -1, null);
+		}
+
+	}
+
+	@Override
+	public String getExecutionPlan() throws Exception {
+		Plan p = createProgramPlan(null, false);
+		return executor.getOptimizerPlanAsJSON(p);
+	}
+
+	public static RemoteTezEnvironment create () {
+		return new RemoteTezEnvironment();
+	}
+
+	public RemoteTezEnvironment() {
+		compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new org.apache.flink.configuration.Configuration());
+		executor = new TezExecutor(compiler, this.getDegreeOfParallelism());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
new file mode 100644
index 0000000..a54724f
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.tez.dag.TezDAGGenerator;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TezExecutor extends PlanExecutor {
+
+	private static final Log LOG = LogFactory.getLog(TezExecutor.class);
+
+	private TezConfiguration tezConf;
+	private Optimizer compiler;
+	
+	private Path jarPath;
+
+	private long runTime = -1; //TODO get DAG execution time from Tez
+	private int parallelism;
+
+	public TezExecutor(TezConfiguration tezConf, Optimizer compiler, int parallelism) {
+		this.tezConf = tezConf;
+		this.compiler = compiler;
+		this.parallelism = parallelism;
+	}
+
+	public TezExecutor(Optimizer compiler, int parallelism) {
+		this.tezConf = null;
+		this.compiler = compiler;
+		this.parallelism = parallelism;
+	}
+
+	public void setConfiguration (TezConfiguration tezConf) {
+		this.tezConf = tezConf;
+	}
+
+	private JobExecutionResult executePlanWithConf (TezConfiguration tezConf, Plan plan) throws Exception {
+
+		String jobName = plan.getJobName();
+
+		TezClient tezClient = TezClient.create(jobName, tezConf);
+		tezClient.start();
+		try {
+			OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism);
+			TezDAGGenerator dagGenerator = new TezDAGGenerator(tezConf, new Configuration());
+			DAG dag = dagGenerator.createDAG(optPlan);
+
+			if (jarPath != null) {
+				addLocalResource(tezConf, jarPath, dag);
+			}
+
+			tezClient.waitTillReady();
+			LOG.info("Submitting DAG to Tez Client");
+			DAGClient dagClient = tezClient.submitDAG(dag);
+
+			LOG.info("Submitted DAG to Tez Client");
+
+			// monitoring
+			DAGStatus dagStatus = dagClient.waitForCompletion();
+
+			if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+				LOG.error (jobName + " failed with diagnostics: " + dagStatus.getDiagnostics());
+				throw new RuntimeException(jobName + " failed with diagnostics: " + dagStatus.getDiagnostics());
+			}
+			LOG.info(jobName + " finished successfully");
+
+			return new JobExecutionResult(null, runTime, null);
+
+		}
+		finally {
+			tezClient.stop();
+		}
+	}
+
+	@Override
+	public JobExecutionResult executePlan(Plan plan) throws Exception {
+		return executePlanWithConf(tezConf, plan);
+	}
+	
+	private static void addLocalResource (TezConfiguration tezConf, Path jarPath, DAG dag) {
+		
+		try {
+			org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(tezConf);
+
+			LOG.info("Jar path received is " + jarPath.toString());
+
+			String jarFile = jarPath.getName();
+
+			Path remoteJarPath = null;
+			
+			/*
+			if (tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR) == null) {
+				LOG.info("Tez staging directory is null, setting it.");
+				Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
+				LOG.info("Setting Tez staging directory to " + stagingDir.toString());
+				tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+				LOG.info("Set Tez staging directory to " + stagingDir.toString());
+			}
+			Path stagingDir = new Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR));
+			LOG.info("Ensuring that Tez staging directory exists");
+			TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+			LOG.info("Tez staging directory exists and is " + stagingDir.toString());
+			*/
+
+
+			Path stagingDir = TezCommonUtils.getTezBaseStagingPath(tezConf);
+			LOG.info("Tez staging path is " + stagingDir);
+			TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+			LOG.info("Tez staging dir exists");
+			
+			remoteJarPath = fs.makeQualified(new Path(stagingDir, jarFile));
+			LOG.info("Copying " + jarPath.toString() + " to " + remoteJarPath.toString());
+			fs.copyFromLocalFile(jarPath, remoteJarPath);
+
+
+			FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
+			Credentials credentials = new Credentials();
+			TokenCache.obtainTokensForNamenodes(credentials, new Path[]{remoteJarPath}, tezConf);
+
+			Map<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
+			LocalResource jobJar = LocalResource.newInstance(
+					ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+					LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+					remoteJarStatus.getLen(), remoteJarStatus.getModificationTime());
+			localResources.put(jarFile.toString(), jobJar);
+
+			dag.addTaskLocalFiles(localResources);
+
+			LOG.info("Added job jar as local resource.");
+		}
+		catch (Exception e) {
+			System.out.println(e.getMessage());
+			e.printStackTrace();
+			System.exit(-1);
+		}
+	}
+	
+	public void setJobJar (Path jarPath) {
+		this.jarPath = jarPath;
+	}
+
+
+	@Override
+	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
+		OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism);
+		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+		return jsonGen.getOptimizerPlanAsJSON(optPlan);
+	}
+
+	public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
+		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
+			p.setDefaultParallelism(parallelism);
+		}
+		return this.compiler.compile(p);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
new file mode 100644
index 0000000..09289fb
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.dag.api.TezConfiguration;
+
+
+public class TezExecutorTool extends Configured implements Tool {
+
+	private static final Log LOG = LogFactory.getLog(TezExecutorTool.class);
+
+	private TezExecutor executor;
+	Plan plan;
+	private Path jarPath = null;
+
+	public TezExecutorTool(TezExecutor executor, Plan plan) {
+		this.executor = executor;
+		this.plan = plan;
+	}
+
+	public void setJobJar (Path jarPath) {
+		this.jarPath = jarPath;
+	}
+
+	@Override
+	public int run(String[] args) throws Exception {
+		
+		Configuration conf = getConf();
+		
+		TezConfiguration tezConf;
+		if (conf != null) {
+			tezConf = new TezConfiguration(conf);
+		} else {
+			tezConf = new TezConfiguration();
+		}
+
+		UserGroupInformation.setConfiguration(tezConf);
+
+		executor.setConfiguration(tezConf);
+
+		try {
+			if (jarPath != null) {
+				executor.setJobJar(jarPath);
+			}
+			JobExecutionResult result = executor.executePlan(plan);
+		}
+		catch (Exception e) {
+			LOG.error("Job execution failed due to: " + e.getMessage());
+			throw new RuntimeException(e.getMessage());
+		}
+		return 0;
+	}
+
+
+}


Mime
View raw message