flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [3/5] flink git commit: Added support for Apache Tez as an execution environment
Date Thu, 02 Apr 2015 18:08:30 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
new file mode 100644
index 0000000..6597733
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.tez.util.FlinkSerialization;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkBroadcastEdge extends FlinkEdge {
+
+	public FlinkBroadcastEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
+		super(source, target, typeSerializer);
+	}
+
+	@Override
+	public Edge createEdge(TezConfiguration tezConf) {
+
+		Map<String,String> serializerMap = new HashMap<String,String>();
+		serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer));
+
+		try {
+			UnorderedKVEdgeConfig edgeConfig =
+					(UnorderedKVEdgeConfig
+							.newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName())
+							.setFromConfiguration(tezConf)
+							.setKeySerializationClass(WritableSerialization.class.getName(), null)
+							.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
+							.configureInput()
+							.setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer))
+							)
+							.done()
+							.build();
+
+			EdgeProperty property = edgeConfig.createDefaultBroadcastEdgeProperty();
+			this.cached = Edge.create(source.getVertex(), target.getVertex(), property);
+			return cached;
+
+		} catch (Exception e) {
+			throw new CompilerException(
+					"An error occurred while creating a Tez Forward Edge: " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
new file mode 100644
index 0000000..e3ddb9e
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.DataSinkProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+public class FlinkDataSinkVertex extends FlinkVertex {
+
+	public FlinkDataSinkVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
+		super(taskName, parallelism, taskConfig);
+	}
+
+	@Override
+	public Vertex createVertex(TezConfiguration conf) {
+		try {
+			this.writeInputPositionsToConfig();
+			this.writeSubTasksInOutputToConfig();
+
+			conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
+
+			ProcessorDescriptor descriptor = ProcessorDescriptor.create(
+					DataSinkProcessor.class.getName());
+
+			descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+			cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
+
+			return cached;
+		}
+		catch (IOException e) {
+			throw new CompilerException(
+					"An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
new file mode 100644
index 0000000..913b854
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.DataSourceProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.runtime.input.FlinkInput;
+import org.apache.flink.tez.runtime.input.FlinkInputSplitGenerator;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+public class FlinkDataSourceVertex extends FlinkVertex {
+
+	public FlinkDataSourceVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
+		super(taskName, parallelism, taskConfig);
+	}
+
+
+	@Override
+	public Vertex createVertex (TezConfiguration conf) {
+		try {
+			this.writeInputPositionsToConfig();
+			this.writeSubTasksInOutputToConfig();
+
+			taskConfig.setDatasourceProcessorName(this.getUniqueName());
+			conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
+
+			ProcessorDescriptor descriptor = ProcessorDescriptor.create(
+					DataSourceProcessor.class.getName());
+
+			descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+			InputDescriptor inputDescriptor = InputDescriptor.create(FlinkInput.class.getName());
+
+			InputInitializerDescriptor inputInitializerDescriptor =
+					InputInitializerDescriptor.create(FlinkInputSplitGenerator.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+			DataSourceDescriptor dataSourceDescriptor = DataSourceDescriptor.create(
+					inputDescriptor,
+					inputInitializerDescriptor,
+					new Credentials()
+			);
+
+			cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
+
+			cached.addDataSource("Input " + this.getUniqueName(), dataSourceDescriptor);
+
+			return cached;
+		}
+		catch (IOException e) {
+			throw new CompilerException(
+					"An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
new file mode 100644
index 0000000..181e675
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.TezConfiguration;
+
+public abstract class FlinkEdge {
+
+	protected FlinkVertex source;
+	protected FlinkVertex target;
+	protected TypeSerializer<?> typeSerializer;
+	protected Edge cached;
+
+	protected FlinkEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
+		this.source = source;
+		this.target = target;
+		this.typeSerializer = typeSerializer;
+	}
+
+	public abstract Edge createEdge(TezConfiguration tezConf);
+
+	public Edge getEdge () {
+		return cached;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
new file mode 100644
index 0000000..4602e96
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.tez.util.FlinkSerialization;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkForwardEdge extends FlinkEdge {
+
+	public FlinkForwardEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
+		super(source, target, typeSerializer);
+	}
+
+	@Override
+	public Edge createEdge(TezConfiguration tezConf) {
+
+		Map<String,String> serializerMap = new HashMap<String,String>();
+		serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer));
+
+		try {
+			UnorderedKVEdgeConfig edgeConfig =
+					(UnorderedKVEdgeConfig
+							.newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName())
+							.setFromConfiguration(tezConf)
+							.setKeySerializationClass(WritableSerialization.class.getName(), null)
+							.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
+							.configureInput()
+							.setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(
+									this.typeSerializer
+							)))
+							.done()
+							.build();
+
+			EdgeProperty property = edgeConfig.createDefaultOneToOneEdgeProperty();
+			this.cached = Edge.create(source.getVertex(), target.getVertex(), property);
+			return cached;
+
+		} catch (Exception e) {
+			throw new CompilerException(
+					"An error occurred while creating a Tez Forward Edge: " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
new file mode 100644
index 0000000..b5f8c2e
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.tez.util.FlinkSerialization;
+import org.apache.flink.tez.runtime.output.SimplePartitioner;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkPartitionEdge extends FlinkEdge {
+
+	public FlinkPartitionEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
+		super(source, target, typeSerializer);
+	}
+
+	@Override
+	public Edge createEdge(TezConfiguration tezConf) {
+
+		Map<String,String> serializerMap = new HashMap<String,String>();
+		serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer));
+
+		try {
+			UnorderedPartitionedKVEdgeConfig edgeConfig =
+					(UnorderedPartitionedKVEdgeConfig
+						.newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName(), SimplePartitioner.class.getName())
+					.setFromConfiguration(tezConf)
+					.setKeySerializationClass(WritableSerialization.class.getName(), null)
+					.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
+					.configureInput()
+					.setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)))
+					.done()
+					.build();
+
+
+			EdgeProperty property = edgeConfig.createDefaultEdgeProperty();
+			this.cached = Edge.create(source.getVertex(), target.getVertex(), property);
+			return cached;
+
+		} catch (Exception e) {
+			throw new CompilerException(
+					"An error occurred while creating a Tez Shuffle Edge: " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
new file mode 100644
index 0000000..2fbba36
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.RegularProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+
+public class FlinkProcessorVertex extends FlinkVertex {
+
+	public FlinkProcessorVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
+		super(taskName, parallelism, taskConfig);
+	}
+
+	@Override
+	public Vertex createVertex(TezConfiguration conf) {
+		try {
+			this.writeInputPositionsToConfig();
+			this.writeSubTasksInOutputToConfig();
+
+			conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
+
+			ProcessorDescriptor descriptor = ProcessorDescriptor.create(
+					RegularProcessor.class.getName());
+
+			descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+			cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
+
+			return cached;
+		} catch (IOException e) {
+			throw new CompilerException(
+					"An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
new file mode 100644
index 0000000..0cf9990
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.UnionProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+public class FlinkUnionVertex extends FlinkVertex {
+
+	public FlinkUnionVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
+		super(taskName, parallelism, taskConfig);
+	}
+
+	@Override
+	public Vertex createVertex(TezConfiguration conf) {
+		try {
+			this.writeInputPositionsToConfig();
+			this.writeSubTasksInOutputToConfig();
+
+			conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
+
+			ProcessorDescriptor descriptor = ProcessorDescriptor.create(
+					UnionProcessor.class.getName());
+
+			descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+			cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
+
+			return cached;
+		}
+		catch (IOException e) {
+			throw new CompilerException(
+					"An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
new file mode 100644
index 0000000..883acc6
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public abstract class FlinkVertex {
+
+	protected Vertex cached;
+	private String taskName;
+	private int parallelism;
+	protected TezTaskConfig taskConfig;
+
+	// Tez-specific bookkeeping
+	protected String uniqueName; //Unique name in DAG
+	private Map<FlinkVertex,ArrayList<Integer>> inputPositions;
+	private ArrayList<Integer> numberOfSubTasksInOutputs;
+
+	public TezTaskConfig getConfig() {
+		return taskConfig;
+	}
+
+	public FlinkVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
+		this.cached = null;
+		this.taskName = taskName;
+		this.parallelism = parallelism;
+		this.taskConfig = taskConfig;
+		this.uniqueName = taskName + UUID.randomUUID().toString();
+		this.inputPositions = new HashMap<FlinkVertex, ArrayList<Integer>>();
+		this.numberOfSubTasksInOutputs = new ArrayList<Integer>();
+	}
+
+	public int getParallelism () {
+		return parallelism;
+	}
+
+	public void setParallelism (int parallelism) {
+		this.parallelism = parallelism;
+	}
+
+	public abstract Vertex createVertex (TezConfiguration conf);
+
+	public Vertex getVertex () {
+		return cached;
+	}
+
+	protected String getUniqueName () {
+		return uniqueName;
+	}
+
+	public void addInput (FlinkVertex vertex, int position) {
+		if (inputPositions.containsKey(vertex)) {
+			inputPositions.get(vertex).add(position);
+		}
+		else {
+			ArrayList<Integer> lst = new ArrayList<Integer>();
+			lst.add(position);
+			inputPositions.put(vertex,lst);
+		}
+	}
+
+	public void addNumberOfSubTasksInOutput (int subTasks, int position) {
+		if (numberOfSubTasksInOutputs.isEmpty()) {
+			numberOfSubTasksInOutputs.add(-1);
+		}
+		int currSize = numberOfSubTasksInOutputs.size();
+		for (int i = currSize; i <= position; i++) {
+			numberOfSubTasksInOutputs.add(i, -1);
+		}
+		numberOfSubTasksInOutputs.set(position, subTasks);
+	}
+
+	// Must be called before taskConfig is written to Tez configuration
+	protected void writeInputPositionsToConfig () {
+		HashMap<String,ArrayList<Integer>> toWrite = new HashMap<String, ArrayList<Integer>>();
+		for (FlinkVertex v: inputPositions.keySet()) {
+			String name = v.getUniqueName();
+			List<Integer> positions = inputPositions.get(v);
+			toWrite.put(name, new ArrayList<Integer>(positions));
+		}
+		this.taskConfig.setInputPositions(toWrite);
+	}
+
+	// Must be called before taskConfig is written to Tez configuration
+	protected void writeSubTasksInOutputToConfig () {
+		this.taskConfig.setNumberSubtasksInOutput(this.numberOfSubTasksInOutputs);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
new file mode 100644
index 0000000..52f39be
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.util.Visitor;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+
+public class TezDAGGenerator implements Visitor<PlanNode> {
+
+	private static final Log LOG = LogFactory.getLog(TezDAGGenerator.class);
+	
+	private Map<PlanNode, FlinkVertex> vertices; // a map from optimizer nodes to Tez vertices
+	private List<FlinkEdge> edges;
+	private final int defaultMaxFan;
+	private final TezConfiguration tezConf;
+
+	private final float defaultSortSpillingThreshold;
+
+	public TezDAGGenerator (TezConfiguration tezConf, Configuration config) {
+		this.defaultMaxFan = config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY,
+				ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
+		this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
+				ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
+		this.tezConf = tezConf;
+	}
+
+	public DAG createDAG (OptimizedPlan program) throws Exception {
+		LOG.info ("Creating Tez DAG");
+		this.vertices = new HashMap<PlanNode, FlinkVertex>();
+		this.edges = new ArrayList<FlinkEdge>();
+		program.accept(this);
+
+		DAG dag = DAG.create(program.getJobName());
+		for (FlinkVertex v : vertices.values()) {
+			dag.addVertex(v.createVertex(new TezConfiguration(tezConf)));
+		}
+		for (FlinkEdge e: edges) {
+			dag.addEdge(e.createEdge(new TezConfiguration(tezConf)));
+		}
+
+		/*
+		 * Temporarily throw an error until TEZ-1190 has been fixed or a workaround has been created
+		 */
+		if (containsSelfJoins()) {
+			throw new CompilerException("Dual-input operators with the same input (self-joins) are not yet supported");
+		}
+
+		this.vertices = null;
+		this.edges = null;
+
+		LOG.info ("Tez DAG created");
+		return dag;
+	}
+
+
+	@Override
+	public boolean preVisit(PlanNode node) {
+		if (this.vertices.containsKey(node)) {
+			// return false to prevent further descend
+			return false;
+		}
+
+		if ((node instanceof BulkIterationPlanNode) || (node instanceof WorksetIterationPlanNode)) {
+			throw new CompilerException("Iterations are not yet supported by the Tez execution environment");
+		}
+
+		if ( (node.getBroadcastInputs() != null) && (!node.getBroadcastInputs().isEmpty())) {
+			throw new CompilerException("Broadcast inputs are not yet supported by the Tez execution environment");
+		}
+
+		FlinkVertex vertex = null;
+
+		try {
+			if (node instanceof SourcePlanNode) {
+				vertex = createDataSourceVertex ((SourcePlanNode) node);
+			}
+			else if (node instanceof SinkPlanNode) {
+				vertex = createDataSinkVertex ((SinkPlanNode) node);
+			}
+			else if ((node instanceof SingleInputPlanNode)) {
+				vertex = createSingleInputVertex((SingleInputPlanNode) node);
+			}
+			else if (node instanceof DualInputPlanNode) {
+				vertex = createDualInputVertex((DualInputPlanNode) node);
+			}
+			else if (node instanceof NAryUnionPlanNode) {
+				vertex = createUnionVertex ((NAryUnionPlanNode) node);
+			}
+			else {
+				throw new CompilerException("Unrecognized node type: " + node.getClass().getName());
+			}
+
+		}
+		catch (Exception e) {
+			throw new CompilerException("Error translating node '" + node + "': " + e.getMessage(), e);
+		}
+
+		if (vertex != null) {
+			this.vertices.put(node, vertex);
+		}
+		return true;
+	}
+
+	@Override
+	public void postVisit (PlanNode node) {
+		try {
+			if (node instanceof SourcePlanNode) {
+				return;
+			}
+			final Iterator<Channel> inConns = node.getInputs().iterator();
+			if (!inConns.hasNext()) {
+				throw new CompilerException("Bug: Found a non-source task with no input.");
+			}
+			int inputIndex = 0;
+
+			FlinkVertex targetVertex = this.vertices.get(node);
+			TezTaskConfig targetVertexConfig = targetVertex.getConfig();
+
+
+			while (inConns.hasNext()) {
+				Channel input = inConns.next();
+				inputIndex += translateChannel(input, inputIndex, targetVertex, targetVertexConfig, false);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			throw new CompilerException(
+					"An error occurred while translating the optimized plan to a Tez DAG: " + e.getMessage(), e);
+		}
+	}
+
+	private FlinkVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException, IOException {
+
+		final String taskName = node.getNodeName();
+		final DriverStrategy ds = node.getDriverStrategy();
+		final int dop = node.getParallelism();
+
+		final TezTaskConfig config= new TezTaskConfig(new Configuration());
+
+		config.setDriver(ds.getDriverClass());
+		config.setDriverStrategy(ds);
+		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+		config.setStubParameters(node.getProgramOperator().getParameters());
+
+		for(int i=0;i<ds.getNumRequiredComparators();i++) {
+			config.setDriverComparator(node.getComparator(i), i);
+		}
+		assignDriverResources(node, config);
+
+		return new FlinkProcessorVertex(taskName, dop, config);
+	}
+
+	private FlinkVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException, IOException {
+		final String taskName = node.getNodeName();
+		final DriverStrategy ds = node.getDriverStrategy();
+		final int dop = node.getParallelism();
+
+		final TezTaskConfig config= new TezTaskConfig(new Configuration());
+
+		config.setDriver(ds.getDriverClass());
+		config.setDriverStrategy(ds);
+		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+		config.setStubParameters(node.getProgramOperator().getParameters());
+
+		if (node.getComparator1() != null) {
+			config.setDriverComparator(node.getComparator1(), 0);
+		}
+		if (node.getComparator2() != null) {
+			config.setDriverComparator(node.getComparator2(), 1);
+		}
+		if (node.getPairComparator() != null) {
+			config.setDriverPairComparator(node.getPairComparator());
+		}
+
+		assignDriverResources(node, config);
+
+		LOG.info("Creating processor vertex " + taskName + " with parallelism " + dop);
+
+		return new FlinkProcessorVertex(taskName, dop, config);
+	}
+
+	private FlinkVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException, IOException {
+		final String taskName = node.getNodeName();
+		final int dop = node.getParallelism();
+
+		final TezTaskConfig config = new TezTaskConfig(new Configuration());
+
+		// set user code
+		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+		config.setStubParameters(node.getProgramOperator().getParameters());
+
+		LOG.info("Creating data sink vertex " + taskName + " with parallelism " + dop);
+		
+		return new FlinkDataSinkVertex(taskName, dop, config);
+	}
+
+	private FlinkVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException, IOException {
+		final String taskName = node.getNodeName();
+		int dop = node.getParallelism();
+
+		final TezTaskConfig config= new TezTaskConfig(new Configuration());
+
+		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+		config.setStubParameters(node.getProgramOperator().getParameters());
+
+		InputFormat format = node.getDataSourceNode().getOperator().getFormatWrapper().getUserCodeObject();
+
+		config.setInputFormat(format);
+
+		// Create as many data sources as input splits
+		InputSplit[] splits = format.createInputSplits((dop > 0) ? dop : 1);
+		dop = splits.length;
+
+		LOG.info("Creating data source vertex " + taskName + " with parallelism " + dop);
+		
+		return new FlinkDataSourceVertex(taskName, dop, config);
+	}
+
+	private FlinkVertex createUnionVertex(NAryUnionPlanNode node) throws CompilerException, IOException {
+		final String taskName = node.getNodeName();
+		final int dop = node.getParallelism();
+		final TezTaskConfig config= new TezTaskConfig(new Configuration());
+
+		LOG.info("Creating union vertex " + taskName + " with parallelism " + dop);
+		
+		return new FlinkUnionVertex (taskName, dop, config);
+	}
+
+
+	private void assignDriverResources(PlanNode node, TaskConfig config) {
+		final double relativeMem = node.getRelativeMemoryPerSubTask();
+		if (relativeMem > 0) {
+			config.setRelativeMemoryDriver(relativeMem);
+			config.setFilehandlesDriver(this.defaultMaxFan);
+			config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold);
+		}
+	}
+
+	private void assignLocalStrategyResources(Channel c, TaskConfig config, int inputNum) {
+		if (c.getRelativeMemoryLocalStrategy() > 0) {
+			config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy());
+			config.setFilehandlesInput(inputNum, this.defaultMaxFan);
+			config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold);
+		}
+	}
+
+	private int translateChannel(Channel input, int inputIndex, FlinkVertex targetVertex,
+								TezTaskConfig targetVertexConfig, boolean isBroadcast) throws Exception
+	{
+		final PlanNode inputPlanNode = input.getSource();
+		final Iterator<Channel> allInChannels;
+
+
+		allInChannels = Collections.singletonList(input).iterator();
+
+
+		// check that the type serializer is consistent
+		TypeSerializerFactory<?> typeSerFact = null;
+
+		while (allInChannels.hasNext()) {
+			final Channel inConn = allInChannels.next();
+
+			if (typeSerFact == null) {
+				typeSerFact = inConn.getSerializer();
+			} else if (!typeSerFact.equals(inConn.getSerializer())) {
+				throw new CompilerException("Conflicting types in union operator.");
+			}
+
+			final PlanNode sourceNode = inConn.getSource();
+			FlinkVertex sourceVertex = this.vertices.get(sourceNode);
+			TezTaskConfig sourceVertexConfig = sourceVertex.getConfig(); //TODO ??? need to create a new TezConfig ???
+
+			connectJobVertices(
+					inConn, inputIndex, sourceVertex, sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast);
+		}
+
+		// the local strategy is added only once. in non-union case that is the actual edge,
+		// in the union case, it is the edge between union and the target node
+		addLocalInfoFromChannelToConfig(input, targetVertexConfig, inputIndex, isBroadcast);
+		return 1;
+	}
+
+	private void connectJobVertices(Channel channel, int inputNumber,
+							final FlinkVertex sourceVertex, final TezTaskConfig sourceConfig,
+							final FlinkVertex targetVertex, final TezTaskConfig targetConfig, boolean isBroadcast)
+			throws CompilerException {
+
+		// -------------- configure the source task's ship strategy strategies in task config --------------
+		final int outputIndex = sourceConfig.getNumOutputs();
+		sourceConfig.addOutputShipStrategy(channel.getShipStrategy());
+		if (outputIndex == 0) {
+			sourceConfig.setOutputSerializer(channel.getSerializer());
+		}
+		if (channel.getShipStrategyComparator() != null) {
+			sourceConfig.setOutputComparator(channel.getShipStrategyComparator(), outputIndex);
+		}
+
+		if (channel.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) {
+
+			final DataDistribution dataDistribution = channel.getDataDistribution();
+			if(dataDistribution != null) {
+				sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
+			} else {
+				throw new RuntimeException("Range partitioning requires data distribution");
+				// TODO: inject code and configuration for automatic histogram generation
+			}
+		}
+
+		// ---------------- configure the receiver -------------------
+		if (isBroadcast) {
+			targetConfig.addBroadcastInputToGroup(inputNumber);
+		} else {
+			targetConfig.addInputToGroup(inputNumber);
+		}
+
+		//----------------- connect source and target with edge ------------------------------
+
+		FlinkEdge edge;
+		ShipStrategyType shipStrategy = channel.getShipStrategy();
+		TypeSerializer<?> serializer = channel.getSerializer().getSerializer();
+		if ((shipStrategy == ShipStrategyType.FORWARD) || (shipStrategy == ShipStrategyType.NONE)) {
+			edge = new FlinkForwardEdge(sourceVertex, targetVertex, serializer);
+			// For forward edges, create as many tasks in upstream operator as in source operator
+			targetVertex.setParallelism(sourceVertex.getParallelism());
+		}
+		else if (shipStrategy == ShipStrategyType.BROADCAST) {
+			edge = new FlinkBroadcastEdge(sourceVertex, targetVertex, serializer);
+		}
+		else if (shipStrategy == ShipStrategyType.PARTITION_HASH) {
+			edge = new FlinkPartitionEdge(sourceVertex, targetVertex, serializer);
+		}
+		else {
+			throw new CompilerException("Ship strategy between nodes " + sourceVertex.getVertex().getName() + " and " + targetVertex.getVertex().getName() + " currently not supported");
+		}
+
+		// Tez-specific bookkeeping
+		// TODO: This probably will not work for vertices with multiple outputs
+		sourceVertex.addNumberOfSubTasksInOutput(targetVertex.getParallelism(), outputIndex);
+		targetVertex.addInput(sourceVertex, inputNumber);
+
+
+		edges.add(edge);
+	}
+
+	private void addLocalInfoFromChannelToConfig(Channel channel, TaskConfig config, int inputNum, boolean isBroadcastChannel) {
+		// serializer
+		if (isBroadcastChannel) {
+			config.setBroadcastInputSerializer(channel.getSerializer(), inputNum);
+
+			if (channel.getLocalStrategy() != LocalStrategy.NONE || (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE)) {
+				throw new CompilerException("Found local strategy or temp mode on a broadcast variable channel.");
+			} else {
+				return;
+			}
+		} else {
+			config.setInputSerializer(channel.getSerializer(), inputNum);
+		}
+
+		// local strategy
+		if (channel.getLocalStrategy() != LocalStrategy.NONE) {
+			config.setInputLocalStrategy(inputNum, channel.getLocalStrategy());
+			if (channel.getLocalStrategyComparator() != null) {
+				config.setInputComparator(channel.getLocalStrategyComparator(), inputNum);
+			}
+		}
+
+		assignLocalStrategyResources(channel, config, inputNum);
+
+		// materialization / caching
+		if (channel.getTempMode() != null) {
+			final TempMode tm = channel.getTempMode();
+
+			boolean needsMemory = false;
+			if (tm.breaksPipeline()) {
+				config.setInputAsynchronouslyMaterialized(inputNum, true);
+				needsMemory = true;
+			}
+			if (tm.isCached()) {
+				config.setInputCached(inputNum, true);
+				needsMemory = true;
+			}
+
+			if (needsMemory) {
+				// sanity check
+				if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) {
+					throw new CompilerException("Bug in compiler: Inconsistent description of input materialization.");
+				}
+				config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory());
+			}
+		}
+	}
+
+	private boolean containsSelfJoins () {
+		for (FlinkVertex v : vertices.values()) {
+			ArrayList<FlinkVertex> predecessors = new ArrayList<FlinkVertex>();
+			for (FlinkEdge e : edges) {
+				if (e.target == v) {
+					if (predecessors.contains(e.source)) {
+						return true;
+					}
+					predecessors.add(e.source);
+				}
+			}
+		}
+		return false;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
new file mode 100644
index 0000000..707fd47
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
+import org.apache.flink.util.Collector;
+
+
+public class ConnectedComponentsStep implements ProgramDescription {
+
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+
+	public static void main(String... args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// read vertex and edge data
+		DataSet<Long> vertices = getVertexDataSet(env);
+		DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
+
+		// assign the initial components (equal to the vertex id)
+		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
+
+		DataSet<Tuple2<Long,Long>> nextComponenets = verticesWithInitialId
+				.join(edges)
+				.where(0).equalTo(0)
+				.with(new NeighborWithComponentIDJoin())
+				.groupBy(0).aggregate(Aggregations.MIN, 1)
+				.join(verticesWithInitialId)
+				.where(0).equalTo(0)
+				.with(new ComponentIdFilter());
+
+
+		// emit result
+		if(fileOutput) {
+			nextComponenets.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			nextComponenets.print();
+		}
+
+		// execute program
+		env.execute("Connected Components Example");
+	}
+
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Function that turns a value into a 2-tuple where both fields are that value.
+	 */
+	@ForwardedFields("*->f0")
+	public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
+
+		@Override
+		public Tuple2<T, T> map(T vertex) {
+			return new Tuple2<T, T>(vertex, vertex);
+		}
+	}
+
+	/**
+	 * Undirected edges by emitting for each input edge the input edges itself and an inverted version.
+	 */
+	public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
+			invertedEdge.f0 = edge.f1;
+			invertedEdge.f1 = edge.f0;
+			out.collect(edge);
+			out.collect(invertedEdge);
+		}
+	}
+
+	/**
+	 * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
+	 * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
+	 * produces a (Target-vertex-ID, Component-ID) pair.
+	 */
+	@ForwardedFieldsFirst("f1->f1")
+	@ForwardedFieldsSecond("f1->f0")
+	public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		@Override
+		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+			return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
+		}
+	}
+
+
+
+	@ForwardedFieldsFirst("*")
+	public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		@Override
+		public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
+			if (candidate.f1 < old.f1) {
+				out.collect(candidate);
+			}
+		}
+	}
+
+
+
+	@Override
+	public String getDescription() {
+		return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>";
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String verticesPath = null;
+	private static String edgesPath = null;
+	private static String outputPath = null;
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] programArguments) {
+
+		if(programArguments.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(programArguments.length == 4) {
+				verticesPath = programArguments[0];
+				edgesPath = programArguments[1];
+				outputPath = programArguments[2];
+				maxIterations = Integer.parseInt(programArguments[3]);
+			} else {
+				System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing Connected Components example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Long> getVertexDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(verticesPath).types(Long.class)
+					.map(
+							new MapFunction<Tuple1<Long>, Long>() {
+								public Long map(Tuple1<Long> value) { return value.f0; }
+							});
+		} else {
+			return ConnectedComponentsData.getDefaultVertexDataSet(env);
+		}
+	}
+
+	private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
+		} else {
+			return ConnectedComponentsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
new file mode 100644
index 0000000..c65fb69
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+import org.apache.hadoop.util.ProgramDriver;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.EnumSet;
+import java.util.Set;
+
+public class ExampleDriver {
+
+	private static final DecimalFormat formatter = new DecimalFormat("###.##%");
+
+	public static void main(String [] args){
+		int exitCode = -1;
+		ProgramDriver pgd = new ProgramDriver();
+		try {
+			pgd.addClass("wc", WordCount.class,
+					"Wordcount");
+			pgd.addClass("tpch3", TPCHQuery3.class,
+					"Modified TPC-H 3 query");
+			pgd.addClass("tc", TransitiveClosureNaiveStep.class,
+					"One step of transitive closure");
+			pgd.addClass("pr", PageRankBasicStep.class,
+					"One step of PageRank");
+			pgd.addClass("cc", ConnectedComponentsStep.class,
+					"One step of connected components");
+			exitCode = pgd.run(args);
+		} catch(Throwable e){
+			e.printStackTrace();
+		}
+		System.exit(exitCode);
+	}
+
+	public static void printDAGStatus(DAGClient dagClient, String[] vertexNames)
+			throws IOException, TezException {
+		printDAGStatus(dagClient, vertexNames, false, false);
+	}
+
+	public static void printDAGStatus(DAGClient dagClient, String[] vertexNames, boolean displayDAGCounters, boolean displayVertexCounters)
+			throws IOException, TezException {
+		Set<StatusGetOpts> opts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+		DAGStatus dagStatus = dagClient.getDAGStatus(
+				(displayDAGCounters ? opts : null));
+		Progress progress = dagStatus.getDAGProgress();
+		double vProgressFloat = 0.0f;
+		if (progress != null) {
+			System.out.println("");
+			System.out.println("DAG: State: "
+					+ dagStatus.getState()
+					+ " Progress: "
+					+ (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) :
+					formatter.format((double)(progress.getSucceededTaskCount())
+							/progress.getTotalTaskCount())));
+			for (String vertexName : vertexNames) {
+				VertexStatus vStatus = dagClient.getVertexStatus(vertexName,
+						(displayVertexCounters ? opts : null));
+				if (vStatus == null) {
+					System.out.println("Could not retrieve status for vertex: "
+							+ vertexName);
+					continue;
+				}
+				Progress vProgress = vStatus.getProgress();
+				if (vProgress != null) {
+					vProgressFloat = 0.0f;
+					if (vProgress.getTotalTaskCount() == 0) {
+						vProgressFloat = 1.0f;
+					} else if (vProgress.getTotalTaskCount() > 0) {
+						vProgressFloat = (double)vProgress.getSucceededTaskCount()
+								/vProgress.getTotalTaskCount();
+					}
+					System.out.println("VertexStatus:"
+							+ " VertexName: "
+							+ (vertexName.equals("ivertex1") ? "intermediate-reducer"
+							: vertexName)
+							+ " Progress: " + formatter.format(vProgressFloat));
+				}
+				if (displayVertexCounters) {
+					TezCounters counters = vStatus.getVertexCounters();
+					if (counters != null) {
+						System.out.println("Vertex Counters for " + vertexName + ": "
+								+ counters);
+					}
+				}
+			}
+		}
+		if (displayDAGCounters) {
+			TezCounters counters = dagStatus.getDAGCounters();
+			if (counters != null) {
+				System.out.println("DAG Counters: " + counters);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
new file mode 100644
index 0000000..031893d
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.PageRankData;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+
+import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
+
+public class PageRankBasicStep {
+
+	private static final double DAMPENING_FACTOR = 0.85;
+	private static final double EPSILON = 0.0001;
+
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		// set up execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataSet<Long> pagesInput = getPagesDataSet(env);
+		DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env);
+
+		// assign initial rank to pages
+		DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
+				map(new RankAssigner((1.0d / numPages)));
+
+		// build adjacency list from link input
+		DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
+				linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
+
+		DataSet<Tuple2<Long, Double>> newRanks = pagesWithRanks
+				.join(adjacencyListInput).where(0).equalTo(0)
+				.flatMap(new JoinVertexWithEdgesMatch())
+				.groupBy(0).aggregate(SUM, 1)
+				.map(new Dampener(DAMPENING_FACTOR, numPages));
+
+
+		// emit result
+		if(fileOutput) {
+			newRanks.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			newRanks.print();
+		}
+
+		// execute program
+		env.execute("Basic Page Rank Example");
+
+	}
+
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * A map function that assigns an initial rank to all pages.
+	 */
+	public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {
+		Tuple2<Long, Double> outPageWithRank;
+
+		public RankAssigner(double rank) {
+			this.outPageWithRank = new Tuple2<Long, Double>(-1l, rank);
+		}
+
+		@Override
+		public Tuple2<Long, Double> map(Long page) {
+			outPageWithRank.f0 = page;
+			return outPageWithRank;
+		}
+	}
+
+	/**
+	 * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
+	 * originate. Run as a pre-processing step.
+	 */
+	@ForwardedFields("0")
+	public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
+
+		private final ArrayList<Long> neighbors = new ArrayList<Long>();
+
+		@Override
+		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
+			neighbors.clear();
+			Long id = 0L;
+
+			for (Tuple2<Long, Long> n : values) {
+				id = n.f0;
+				neighbors.add(n.f1);
+			}
+			out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
+		}
+	}
+
+	/**
+	 * Join function that distributes a fraction of a vertex's rank to all neighbors.
+	 */
+	public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> {
+
+		@Override
+		public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){
+			Long[] neigbors = value.f1.f1;
+			double rank = value.f0.f1;
+			double rankToDistribute = rank / ((double) neigbors.length);
+
+			for (int i = 0; i < neigbors.length; i++) {
+				out.collect(new Tuple2<Long, Double>(neigbors[i], rankToDistribute));
+			}
+		}
+	}
+
+	/**
+	 * The function that applies the page rank dampening formula
+	 */
+	@ForwardedFields("0")
+	public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
+
+		private final double dampening;
+		private final double randomJump;
+
+		public Dampener(double dampening, double numVertices) {
+			this.dampening = dampening;
+			this.randomJump = (1 - dampening) / numVertices;
+		}
+
+		@Override
+		public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
+			value.f1 = (value.f1 * dampening) + randomJump;
+			return value;
+		}
+	}
+
+	/**
+	 * Filter that filters vertices where the rank difference is below a threshold.
+	 */
+	public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
+
+		@Override
+		public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
+			return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
+		}
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String pagesInputPath = null;
+	private static String linksInputPath = null;
+	private static String outputPath = null;
+	private static long numPages = 0;
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length == 5) {
+				fileOutput = true;
+				pagesInputPath = args[0];
+				linksInputPath = args[1];
+				outputPath = args[2];
+				numPages = Integer.parseInt(args[3]);
+				maxIterations = Integer.parseInt(args[4]);
+			} else {
+				System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing PageRank Basic example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+
+			numPages = PageRankData.getNumberOfPages();
+		}
+		return true;
+	}
+
+	private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env
+					.readCsvFile(pagesInputPath)
+					.fieldDelimiter(' ')
+					.lineDelimiter("\n")
+					.types(Long.class)
+					.map(new MapFunction<Tuple1<Long>, Long>() {
+						@Override
+						public Long map(Tuple1<Long> v) { return v.f0; }
+					});
+		} else {
+			return PageRankData.getDefaultPagesDataSet(env);
+		}
+	}
+
+	private static DataSet<Tuple2<Long, Long>> getLinksDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(linksInputPath)
+					.fieldDelimiter(' ')
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class);
+		} else {
+			return PageRankData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
new file mode 100644
index 0000000..d61f80e
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.tez.client.RemoteTezEnvironment;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class TPCHQuery3 {
+
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		final RemoteTezEnvironment env = RemoteTezEnvironment.create();
+		env.setParallelism(400);
+
+
+		// get input data
+		DataSet<Lineitem> lineitems = getLineitemDataSet(env);
+		DataSet<Order> orders = getOrdersDataSet(env);
+		DataSet<Customer> customers = getCustomerDataSet(env);
+
+		// Filter market segment "AUTOMOBILE"
+		customers = customers.filter(
+				new FilterFunction<Customer>() {
+					@Override
+					public boolean filter(Customer c) {
+						return c.getMktsegment().equals("AUTOMOBILE");
+					}
+				});
+
+		// Filter all Orders with o_orderdate < 12.03.1995
+		orders = orders.filter(
+				new FilterFunction<Order>() {
+					private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+					private final Date date = format.parse("1995-03-12");
+
+					@Override
+					public boolean filter(Order o) throws ParseException {
+						return format.parse(o.getOrderdate()).before(date);
+					}
+				});
+
+		// Filter all Lineitems with l_shipdate > 12.03.1995
+		lineitems = lineitems.filter(
+				new FilterFunction<Lineitem>() {
+					private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+					private final Date date = format.parse("1995-03-12");
+
+					@Override
+					public boolean filter(Lineitem l) throws ParseException {
+						return format.parse(l.getShipdate()).after(date);
+					}
+				});
+
+		// Join customers with orders and package them into a ShippingPriorityItem
+		DataSet<ShippingPriorityItem> customerWithOrders =
+				customers.join(orders).where(0).equalTo(1)
+						.with(
+								new JoinFunction<Customer, Order, ShippingPriorityItem>() {
+									@Override
+									public ShippingPriorityItem join(Customer c, Order o) {
+										return new ShippingPriorityItem(o.getOrderKey(), 0.0, o.getOrderdate(),
+												o.getShippriority());
+									}
+								});
+
+		// Join the last join result with Lineitems
+		DataSet<ShippingPriorityItem> result =
+				customerWithOrders.join(lineitems).where(0).equalTo(0)
+						.with(
+								new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
+									@Override
+									public ShippingPriorityItem join(ShippingPriorityItem i, Lineitem l) {
+										i.setRevenue(l.getExtendedprice() * (1 - l.getDiscount()));
+										return i;
+									}
+								})
+								// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
+						.groupBy(0, 2, 3)
+						.aggregate(Aggregations.SUM, 1);
+
+		// emit result
+		result.writeAsCsv(outputPath, "\n", "|");
+
+		// execute program
+		env.registerMainClass(TPCHQuery3.class);
+		env.execute("TPCH Query 3 Example");
+
+	}
+
+	// *************************************************************************
+	//     DATA TYPES
+	// *************************************************************************
+
+	public static class Lineitem extends Tuple4<Integer, Double, Double, String> {
+
+		public Integer getOrderkey() { return this.f0; }
+		public Double getDiscount() { return this.f2; }
+		public Double getExtendedprice() { return this.f1; }
+		public String getShipdate() { return this.f3; }
+	}
+
+	public static class Customer extends Tuple2<Integer, String> {
+
+		public Integer getCustKey() { return this.f0; }
+		public String getMktsegment() { return this.f1; }
+	}
+
+	public static class Order extends Tuple4<Integer, Integer, String, Integer> {
+
+		public Integer getOrderKey() { return this.f0; }
+		public Integer getCustKey() { return this.f1; }
+		public String getOrderdate() { return this.f2; }
+		public Integer getShippriority() { return this.f3; }
+	}
+
+	public static class ShippingPriorityItem extends Tuple4<Integer, Double, String, Integer> {
+
+		public ShippingPriorityItem() { }
+
+		public ShippingPriorityItem(Integer o_orderkey, Double revenue,
+									String o_orderdate, Integer o_shippriority) {
+			this.f0 = o_orderkey;
+			this.f1 = revenue;
+			this.f2 = o_orderdate;
+			this.f3 = o_shippriority;
+		}
+
+		public Integer getOrderkey() { return this.f0; }
+		public void setOrderkey(Integer orderkey) { this.f0 = orderkey; }
+		public Double getRevenue() { return this.f1; }
+		public void setRevenue(Double revenue) { this.f1 = revenue; }
+
+		public String getOrderdate() { return this.f2; }
+		public Integer getShippriority() { return this.f3; }
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static String lineitemPath;
+	private static String customerPath;
+	private static String ordersPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] programArguments) {
+
+		if(programArguments.length > 0) {
+			if(programArguments.length == 4) {
+				lineitemPath = programArguments[0];
+				customerPath = programArguments[1];
+				ordersPath = programArguments[2];
+				outputPath = programArguments[3];
+			} else {
+				System.err.println("Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
+				return false;
+			}
+		} else {
+			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+					"  Due to legal restrictions, we can not ship generated data.\n" +
+					"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+					"  Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
+			return false;
+		}
+		return true;
+	}
+
+	private static DataSet<Lineitem> getLineitemDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(lineitemPath)
+				.fieldDelimiter('|')
+				.includeFields("1000011000100000")
+				.tupleType(Lineitem.class);
+	}
+
+	private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(customerPath)
+				.fieldDelimiter('|')
+				.includeFields("10000010")
+				.tupleType(Customer.class);
+	}
+
+	private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(ordersPath)
+				.fieldDelimiter('|')
+				.includeFields("110010010")
+				.tupleType(Order.class);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java
new file mode 100644
index 0000000..b014c3e
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
+import org.apache.flink.util.Collector;
+
+
+/*
+ * NOTE:
+ * This program is currently supposed to throw a Compiler Exception due to TEZ-1190
+ */
+
+public class TransitiveClosureNaiveStep implements ProgramDescription {
+
+
+	public static void main (String... args) throws Exception{
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env);
+
+		DataSet<Tuple2<Long,Long>> nextPaths = edges
+				.join(edges)
+				.where(1)
+				.equalTo(0)
+				.with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+					@Override
+					/**
+					 left: Path (z,x) - x is reachable by z
+					 right: Edge (x,y) - edge x-->y exists
+					 out: Path (z,y) - y is reachable by z
+					 */
+					public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
+						return new Tuple2<Long, Long>(
+								new Long(left.f0),
+								new Long(right.f1));
+					}
+				})
+				.union(edges)
+				.groupBy(0, 1)
+				.reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+					@Override
+					public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+						out.collect(values.iterator().next());
+					}
+				});
+
+		// emit result
+		if (fileOutput) {
+			nextPaths.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			nextPaths.print();
+		}
+
+		// execute program
+		env.execute("Transitive Closure Example");
+
+	}
+
+	@Override
+	public String getDescription() {
+		return "Parameters: <edges-path> <result-path> <max-number-of-iterations>";
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgesPath = null;
+	private static String outputPath = null;
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] programArguments) {
+
+		if (programArguments.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (programArguments.length == 3) {
+				edgesPath = programArguments[0];
+				outputPath = programArguments[1];
+				maxIterations = Integer.parseInt(programArguments[2]);
+			} else {
+				System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+		}
+		return true;
+	}
+
+
+	private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
+		} else {
+			return ConnectedComponentsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java
new file mode 100644
index 0000000..e758156
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.tez.client.RemoteTezEnvironment;
+import org.apache.flink.util.Collector;
+
+public class WordCount {
+
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final RemoteTezEnvironment env = RemoteTezEnvironment.create();
+		env.setParallelism(8);
+
+		// get input data
+		DataSet<String> text = getTextDataSet(env);
+
+		DataSet<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.groupBy(0)
+						.sum(1);
+
+		// emit result
+		if(fileOutput) {
+			counts.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.registerMainClass(WordCount.class);
+		env.execute("WordCount Example");
+	}
+
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a user-defined
+	 * FlatMapFunction. The function takes a line (String) and splits it into
+	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return WordCountData.getDefaultTextLineDataSet(env);
+		}
+	}
+}


Mime
View raw message