flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [17/50] flink git commit: [FLINK-1712] Remove "flink-staging" module
Date Thu, 14 Jan 2016 16:16:14 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
deleted file mode 100644
index 60449db..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.client;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.tez.dag.TezDAGGenerator;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-public class TezExecutor extends PlanExecutor {
-
-	private static final Log LOG = LogFactory.getLog(TezExecutor.class);
-
-	private TezConfiguration tezConf;
-	private Optimizer compiler;
-	
-	private Path jarPath;
-
-	private long runTime = -1; //TODO get DAG execution time from Tez
-	private int parallelism;
-
-	public TezExecutor(TezConfiguration tezConf, Optimizer compiler, int parallelism) {
-		this.tezConf = tezConf;
-		this.compiler = compiler;
-		this.parallelism = parallelism;
-	}
-
-	public TezExecutor(Optimizer compiler, int parallelism) {
-		this.tezConf = null;
-		this.compiler = compiler;
-		this.parallelism = parallelism;
-	}
-
-	public void setConfiguration (TezConfiguration tezConf) {
-		this.tezConf = tezConf;
-	}
-
-	private JobExecutionResult executePlanWithConf (TezConfiguration tezConf, Plan plan) throws Exception {
-
-		String jobName = plan.getJobName();
-
-		TezClient tezClient = TezClient.create(jobName, tezConf);
-		tezClient.start();
-		try {
-			OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism);
-			TezDAGGenerator dagGenerator = new TezDAGGenerator(tezConf, new Configuration());
-			DAG dag = dagGenerator.createDAG(optPlan);
-
-			if (jarPath != null) {
-				addLocalResource(tezConf, jarPath, dag);
-			}
-
-			tezClient.waitTillReady();
-			LOG.info("Submitting DAG to Tez Client");
-			DAGClient dagClient = tezClient.submitDAG(dag);
-
-			LOG.info("Submitted DAG to Tez Client");
-
-			// monitoring
-			DAGStatus dagStatus = dagClient.waitForCompletion();
-
-			if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-				LOG.error (jobName + " failed with diagnostics: " + dagStatus.getDiagnostics());
-				throw new RuntimeException(jobName + " failed with diagnostics: " + dagStatus.getDiagnostics());
-			}
-			LOG.info(jobName + " finished successfully");
-
-			return new JobExecutionResult(null, runTime, null);
-
-		}
-		finally {
-			tezClient.stop();
-		}
-	}
-
-	@Override
-	public void start() throws Exception {
-		throw new IllegalStateException("Session management is not supported in the TezExecutor.");
-	}
-
-	@Override
-	public void stop() throws Exception {
-		throw new IllegalStateException("Session management is not supported in the TezExecutor.");
-	}
-
-	@Override
-	public void endSession(JobID jobID) throws Exception {
-		throw new IllegalStateException("Session management is not supported in the TezExecutor.");
-	}
-
-	@Override
-	public boolean isRunning() {
-		return false;
-	}
-
-	@Override
-	public JobExecutionResult executePlan(Plan plan) throws Exception {
-		return executePlanWithConf(tezConf, plan);
-	}
-	
-	private static void addLocalResource (TezConfiguration tezConf, Path jarPath, DAG dag) {
-		
-		try {
-			org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(tezConf);
-
-			LOG.info("Jar path received is " + jarPath.toString());
-
-			String jarFile = jarPath.getName();
-
-			Path remoteJarPath = null;
-			
-			/*
-			if (tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR) == null) {
-				LOG.info("Tez staging directory is null, setting it.");
-				Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
-				LOG.info("Setting Tez staging directory to " + stagingDir.toString());
-				tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
-				LOG.info("Set Tez staging directory to " + stagingDir.toString());
-			}
-			Path stagingDir = new Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR));
-			LOG.info("Ensuring that Tez staging directory exists");
-			TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
-			LOG.info("Tez staging directory exists and is " + stagingDir.toString());
-			*/
-
-
-			Path stagingDir = TezCommonUtils.getTezBaseStagingPath(tezConf);
-			LOG.info("Tez staging path is " + stagingDir);
-			TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
-			LOG.info("Tez staging dir exists");
-			
-			remoteJarPath = fs.makeQualified(new Path(stagingDir, jarFile));
-			LOG.info("Copying " + jarPath.toString() + " to " + remoteJarPath.toString());
-			fs.copyFromLocalFile(jarPath, remoteJarPath);
-
-
-			FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
-			Credentials credentials = new Credentials();
-			TokenCache.obtainTokensForNamenodes(credentials, new Path[]{remoteJarPath}, tezConf);
-
-			Map<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
-			LocalResource jobJar = LocalResource.newInstance(
-					ConverterUtils.getYarnUrlFromPath(remoteJarPath),
-					LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
-					remoteJarStatus.getLen(), remoteJarStatus.getModificationTime());
-			localResources.put(jarFile.toString(), jobJar);
-
-			dag.addTaskLocalFiles(localResources);
-
-			LOG.info("Added job jar as local resource.");
-		}
-		catch (Exception e) {
-			System.out.println(e.getMessage());
-			e.printStackTrace();
-			System.exit(-1);
-		}
-	}
-	
-	public void setJobJar (Path jarPath) {
-		this.jarPath = jarPath;
-	}
-
-
-	@Override
-	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-		OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism);
-		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-		return jsonGen.getOptimizerPlanAsJSON(optPlan);
-	}
-
-	public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
-		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
-			p.setDefaultParallelism(parallelism);
-		}
-		return this.compiler.compile(p);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
deleted file mode 100644
index 09289fb..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.client;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Tool;
-import org.apache.tez.dag.api.TezConfiguration;
-
-
-public class TezExecutorTool extends Configured implements Tool {
-
-	private static final Log LOG = LogFactory.getLog(TezExecutorTool.class);
-
-	private TezExecutor executor;
-	Plan plan;
-	private Path jarPath = null;
-
-	public TezExecutorTool(TezExecutor executor, Plan plan) {
-		this.executor = executor;
-		this.plan = plan;
-	}
-
-	public void setJobJar (Path jarPath) {
-		this.jarPath = jarPath;
-	}
-
-	@Override
-	public int run(String[] args) throws Exception {
-		
-		Configuration conf = getConf();
-		
-		TezConfiguration tezConf;
-		if (conf != null) {
-			tezConf = new TezConfiguration(conf);
-		} else {
-			tezConf = new TezConfiguration();
-		}
-
-		UserGroupInformation.setConfiguration(tezConf);
-
-		executor.setConfiguration(tezConf);
-
-		try {
-			if (jarPath != null) {
-				executor.setJobJar(jarPath);
-			}
-			JobExecutionResult result = executor.executePlan(plan);
-		}
-		catch (Exception e) {
-			LOG.error("Job execution failed due to: " + e.getMessage());
-			throw new RuntimeException(e.getMessage());
-		}
-		return 0;
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
deleted file mode 100644
index 6597733..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.tez.util.FlinkSerialization;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkBroadcastEdge extends FlinkEdge {
-
-	public FlinkBroadcastEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
-		super(source, target, typeSerializer);
-	}
-
-	@Override
-	public Edge createEdge(TezConfiguration tezConf) {
-
-		Map<String,String> serializerMap = new HashMap<String,String>();
-		serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer));
-
-		try {
-			UnorderedKVEdgeConfig edgeConfig =
-					(UnorderedKVEdgeConfig
-							.newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName())
-							.setFromConfiguration(tezConf)
-							.setKeySerializationClass(WritableSerialization.class.getName(), null)
-							.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
-							.configureInput()
-							.setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer))
-							)
-							.done()
-							.build();
-
-			EdgeProperty property = edgeConfig.createDefaultBroadcastEdgeProperty();
-			this.cached = Edge.create(source.getVertex(), target.getVertex(), property);
-			return cached;
-
-		} catch (Exception e) {
-			throw new CompilerException(
-					"An error occurred while creating a Tez Forward Edge: " + e.getMessage(), e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
deleted file mode 100644
index e3ddb9e..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.DataSinkProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-public class FlinkDataSinkVertex extends FlinkVertex {
-
-	public FlinkDataSinkVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
-		super(taskName, parallelism, taskConfig);
-	}
-
-	@Override
-	public Vertex createVertex(TezConfiguration conf) {
-		try {
-			this.writeInputPositionsToConfig();
-			this.writeSubTasksInOutputToConfig();
-
-			conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
-
-			ProcessorDescriptor descriptor = ProcessorDescriptor.create(
-					DataSinkProcessor.class.getName());
-
-			descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-			cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
-
-			return cached;
-		}
-		catch (IOException e) {
-			throw new CompilerException(
-					"An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
deleted file mode 100644
index 913b854..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.DataSourceProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.runtime.input.FlinkInput;
-import org.apache.flink.tez.runtime.input.FlinkInputSplitGenerator;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-public class FlinkDataSourceVertex extends FlinkVertex {
-
-	public FlinkDataSourceVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
-		super(taskName, parallelism, taskConfig);
-	}
-
-
-	@Override
-	public Vertex createVertex (TezConfiguration conf) {
-		try {
-			this.writeInputPositionsToConfig();
-			this.writeSubTasksInOutputToConfig();
-
-			taskConfig.setDatasourceProcessorName(this.getUniqueName());
-			conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
-
-			ProcessorDescriptor descriptor = ProcessorDescriptor.create(
-					DataSourceProcessor.class.getName());
-
-			descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-			InputDescriptor inputDescriptor = InputDescriptor.create(FlinkInput.class.getName());
-
-			InputInitializerDescriptor inputInitializerDescriptor =
-					InputInitializerDescriptor.create(FlinkInputSplitGenerator.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-			DataSourceDescriptor dataSourceDescriptor = DataSourceDescriptor.create(
-					inputDescriptor,
-					inputInitializerDescriptor,
-					new Credentials()
-			);
-
-			cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
-
-			cached.addDataSource("Input " + this.getUniqueName(), dataSourceDescriptor);
-
-			return cached;
-		}
-		catch (IOException e) {
-			throw new CompilerException(
-					"An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
deleted file mode 100644
index 181e675..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.TezConfiguration;
-
-public abstract class FlinkEdge {
-
-	protected FlinkVertex source;
-	protected FlinkVertex target;
-	protected TypeSerializer<?> typeSerializer;
-	protected Edge cached;
-
-	protected FlinkEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
-		this.source = source;
-		this.target = target;
-		this.typeSerializer = typeSerializer;
-	}
-
-	public abstract Edge createEdge(TezConfiguration tezConf);
-
-	public Edge getEdge () {
-		return cached;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
deleted file mode 100644
index 4602e96..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.tez.util.FlinkSerialization;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkForwardEdge extends FlinkEdge {
-
-	public FlinkForwardEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
-		super(source, target, typeSerializer);
-	}
-
-	@Override
-	public Edge createEdge(TezConfiguration tezConf) {
-
-		Map<String,String> serializerMap = new HashMap<String,String>();
-		serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer));
-
-		try {
-			UnorderedKVEdgeConfig edgeConfig =
-					(UnorderedKVEdgeConfig
-							.newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName())
-							.setFromConfiguration(tezConf)
-							.setKeySerializationClass(WritableSerialization.class.getName(), null)
-							.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
-							.configureInput()
-							.setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(
-									this.typeSerializer
-							)))
-							.done()
-							.build();
-
-			EdgeProperty property = edgeConfig.createDefaultOneToOneEdgeProperty();
-			this.cached = Edge.create(source.getVertex(), target.getVertex(), property);
-			return cached;
-
-		} catch (Exception e) {
-			throw new CompilerException(
-					"An error occurred while creating a Tez Forward Edge: " + e.getMessage(), e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
deleted file mode 100644
index b5f8c2e..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.tez.util.FlinkSerialization;
-import org.apache.flink.tez.runtime.output.SimplePartitioner;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkPartitionEdge extends FlinkEdge {
-
-	public FlinkPartitionEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
-		super(source, target, typeSerializer);
-	}
-
-	@Override
-	public Edge createEdge(TezConfiguration tezConf) {
-
-		Map<String,String> serializerMap = new HashMap<String,String>();
-		serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer));
-
-		try {
-			UnorderedPartitionedKVEdgeConfig edgeConfig =
-					(UnorderedPartitionedKVEdgeConfig
-						.newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName(), SimplePartitioner.class.getName())
-					.setFromConfiguration(tezConf)
-					.setKeySerializationClass(WritableSerialization.class.getName(), null)
-					.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
-					.configureInput()
-					.setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)))
-					.done()
-					.build();
-
-
-			EdgeProperty property = edgeConfig.createDefaultEdgeProperty();
-			this.cached = Edge.create(source.getVertex(), target.getVertex(), property);
-			return cached;
-
-		} catch (Exception e) {
-			throw new CompilerException(
-					"An error occurred while creating a Tez Shuffle Edge: " + e.getMessage(), e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
deleted file mode 100644
index 2fbba36..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.RegularProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-
-public class FlinkProcessorVertex extends FlinkVertex {
-
-	public FlinkProcessorVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
-		super(taskName, parallelism, taskConfig);
-	}
-
-	@Override
-	public Vertex createVertex(TezConfiguration conf) {
-		try {
-			this.writeInputPositionsToConfig();
-			this.writeSubTasksInOutputToConfig();
-
-			conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
-
-			ProcessorDescriptor descriptor = ProcessorDescriptor.create(
-					RegularProcessor.class.getName());
-
-			descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-			cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
-
-			return cached;
-		} catch (IOException e) {
-			throw new CompilerException(
-					"An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
deleted file mode 100644
index 0cf9990..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.UnionProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-public class FlinkUnionVertex extends FlinkVertex {
-
-	public FlinkUnionVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
-		super(taskName, parallelism, taskConfig);
-	}
-
-	@Override
-	public Vertex createVertex(TezConfiguration conf) {
-		try {
-			this.writeInputPositionsToConfig();
-			this.writeSubTasksInOutputToConfig();
-
-			conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
-
-			ProcessorDescriptor descriptor = ProcessorDescriptor.create(
-					UnionProcessor.class.getName());
-
-			descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-			cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
-
-			return cached;
-		}
-		catch (IOException e) {
-			throw new CompilerException(
-					"An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
deleted file mode 100644
index 883acc6..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public abstract class FlinkVertex {
-
-	protected Vertex cached;
-	private String taskName;
-	private int parallelism;
-	protected TezTaskConfig taskConfig;
-
-	// Tez-specific bookkeeping
-	protected String uniqueName; //Unique name in DAG
-	private Map<FlinkVertex,ArrayList<Integer>> inputPositions;
-	private ArrayList<Integer> numberOfSubTasksInOutputs;
-
-	public TezTaskConfig getConfig() {
-		return taskConfig;
-	}
-
-	public FlinkVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
-		this.cached = null;
-		this.taskName = taskName;
-		this.parallelism = parallelism;
-		this.taskConfig = taskConfig;
-		this.uniqueName = taskName + UUID.randomUUID().toString();
-		this.inputPositions = new HashMap<FlinkVertex, ArrayList<Integer>>();
-		this.numberOfSubTasksInOutputs = new ArrayList<Integer>();
-	}
-
-	public int getParallelism () {
-		return parallelism;
-	}
-
-	public void setParallelism (int parallelism) {
-		this.parallelism = parallelism;
-	}
-
-	public abstract Vertex createVertex (TezConfiguration conf);
-
-	public Vertex getVertex () {
-		return cached;
-	}
-
-	protected String getUniqueName () {
-		return uniqueName;
-	}
-
-	public void addInput (FlinkVertex vertex, int position) {
-		if (inputPositions.containsKey(vertex)) {
-			inputPositions.get(vertex).add(position);
-		}
-		else {
-			ArrayList<Integer> lst = new ArrayList<Integer>();
-			lst.add(position);
-			inputPositions.put(vertex,lst);
-		}
-	}
-
-	public void addNumberOfSubTasksInOutput (int subTasks, int position) {
-		if (numberOfSubTasksInOutputs.isEmpty()) {
-			numberOfSubTasksInOutputs.add(-1);
-		}
-		int currSize = numberOfSubTasksInOutputs.size();
-		for (int i = currSize; i <= position; i++) {
-			numberOfSubTasksInOutputs.add(i, -1);
-		}
-		numberOfSubTasksInOutputs.set(position, subTasks);
-	}
-
-	// Must be called before taskConfig is written to Tez configuration
-	protected void writeInputPositionsToConfig () {
-		HashMap<String,ArrayList<Integer>> toWrite = new HashMap<String, ArrayList<Integer>>();
-		for (FlinkVertex v: inputPositions.keySet()) {
-			String name = v.getUniqueName();
-			List<Integer> positions = inputPositions.get(v);
-			toWrite.put(name, new ArrayList<Integer>(positions));
-		}
-		this.taskConfig.setInputPositions(toWrite);
-	}
-
-	// Must be called before taskConfig is written to Tez configuration
-	protected void writeSubTasksInOutputToConfig () {
-		this.taskConfig.setNumberSubtasksInOutput(this.numberOfSubTasksInOutputs);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
deleted file mode 100644
index 52f39be..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.util.Visitor;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TezConfiguration;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-
-public class TezDAGGenerator implements Visitor<PlanNode> {
-
-	private static final Log LOG = LogFactory.getLog(TezDAGGenerator.class);
-	
-	private Map<PlanNode, FlinkVertex> vertices; // a map from optimizer nodes to Tez vertices
-	private List<FlinkEdge> edges;
-	private final int defaultMaxFan;
-	private final TezConfiguration tezConf;
-
-	private final float defaultSortSpillingThreshold;
-
-	public TezDAGGenerator (TezConfiguration tezConf, Configuration config) {
-		this.defaultMaxFan = config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY,
-				ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
-		this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
-				ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
-		this.tezConf = tezConf;
-	}
-
-	public DAG createDAG (OptimizedPlan program) throws Exception {
-		LOG.info ("Creating Tez DAG");
-		this.vertices = new HashMap<PlanNode, FlinkVertex>();
-		this.edges = new ArrayList<FlinkEdge>();
-		program.accept(this);
-
-		DAG dag = DAG.create(program.getJobName());
-		for (FlinkVertex v : vertices.values()) {
-			dag.addVertex(v.createVertex(new TezConfiguration(tezConf)));
-		}
-		for (FlinkEdge e: edges) {
-			dag.addEdge(e.createEdge(new TezConfiguration(tezConf)));
-		}
-
-		/*
-		 * Temporarily throw an error until TEZ-1190 has been fixed or a workaround has been created
-		 */
-		if (containsSelfJoins()) {
-			throw new CompilerException("Dual-input operators with the same input (self-joins) are not yet supported");
-		}
-
-		this.vertices = null;
-		this.edges = null;
-
-		LOG.info ("Tez DAG created");
-		return dag;
-	}
-
-
-	@Override
-	public boolean preVisit(PlanNode node) {
-		if (this.vertices.containsKey(node)) {
-			// return false to prevent further descend
-			return false;
-		}
-
-		if ((node instanceof BulkIterationPlanNode) || (node instanceof WorksetIterationPlanNode)) {
-			throw new CompilerException("Iterations are not yet supported by the Tez execution environment");
-		}
-
-		if ( (node.getBroadcastInputs() != null) && (!node.getBroadcastInputs().isEmpty())) {
-			throw new CompilerException("Broadcast inputs are not yet supported by the Tez execution environment");
-		}
-
-		FlinkVertex vertex = null;
-
-		try {
-			if (node instanceof SourcePlanNode) {
-				vertex = createDataSourceVertex ((SourcePlanNode) node);
-			}
-			else if (node instanceof SinkPlanNode) {
-				vertex = createDataSinkVertex ((SinkPlanNode) node);
-			}
-			else if ((node instanceof SingleInputPlanNode)) {
-				vertex = createSingleInputVertex((SingleInputPlanNode) node);
-			}
-			else if (node instanceof DualInputPlanNode) {
-				vertex = createDualInputVertex((DualInputPlanNode) node);
-			}
-			else if (node instanceof NAryUnionPlanNode) {
-				vertex = createUnionVertex ((NAryUnionPlanNode) node);
-			}
-			else {
-				throw new CompilerException("Unrecognized node type: " + node.getClass().getName());
-			}
-
-		}
-		catch (Exception e) {
-			throw new CompilerException("Error translating node '" + node + "': " + e.getMessage(), e);
-		}
-
-		if (vertex != null) {
-			this.vertices.put(node, vertex);
-		}
-		return true;
-	}
-
-	@Override
-	public void postVisit (PlanNode node) {
-		try {
-			if (node instanceof SourcePlanNode) {
-				return;
-			}
-			final Iterator<Channel> inConns = node.getInputs().iterator();
-			if (!inConns.hasNext()) {
-				throw new CompilerException("Bug: Found a non-source task with no input.");
-			}
-			int inputIndex = 0;
-
-			FlinkVertex targetVertex = this.vertices.get(node);
-			TezTaskConfig targetVertexConfig = targetVertex.getConfig();
-
-
-			while (inConns.hasNext()) {
-				Channel input = inConns.next();
-				inputIndex += translateChannel(input, inputIndex, targetVertex, targetVertexConfig, false);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			throw new CompilerException(
-					"An error occurred while translating the optimized plan to a Tez DAG: " + e.getMessage(), e);
-		}
-	}
-
-	private FlinkVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException, IOException {
-
-		final String taskName = node.getNodeName();
-		final DriverStrategy ds = node.getDriverStrategy();
-		final int dop = node.getParallelism();
-
-		final TezTaskConfig config= new TezTaskConfig(new Configuration());
-
-		config.setDriver(ds.getDriverClass());
-		config.setDriverStrategy(ds);
-		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
-		config.setStubParameters(node.getProgramOperator().getParameters());
-
-		for(int i=0;i<ds.getNumRequiredComparators();i++) {
-			config.setDriverComparator(node.getComparator(i), i);
-		}
-		assignDriverResources(node, config);
-
-		return new FlinkProcessorVertex(taskName, dop, config);
-	}
-
-	private FlinkVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException, IOException {
-		final String taskName = node.getNodeName();
-		final DriverStrategy ds = node.getDriverStrategy();
-		final int dop = node.getParallelism();
-
-		final TezTaskConfig config= new TezTaskConfig(new Configuration());
-
-		config.setDriver(ds.getDriverClass());
-		config.setDriverStrategy(ds);
-		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
-		config.setStubParameters(node.getProgramOperator().getParameters());
-
-		if (node.getComparator1() != null) {
-			config.setDriverComparator(node.getComparator1(), 0);
-		}
-		if (node.getComparator2() != null) {
-			config.setDriverComparator(node.getComparator2(), 1);
-		}
-		if (node.getPairComparator() != null) {
-			config.setDriverPairComparator(node.getPairComparator());
-		}
-
-		assignDriverResources(node, config);
-
-		LOG.info("Creating processor vertex " + taskName + " with parallelism " + dop);
-
-		return new FlinkProcessorVertex(taskName, dop, config);
-	}
-
-	private FlinkVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException, IOException {
-		final String taskName = node.getNodeName();
-		final int dop = node.getParallelism();
-
-		final TezTaskConfig config = new TezTaskConfig(new Configuration());
-
-		// set user code
-		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
-		config.setStubParameters(node.getProgramOperator().getParameters());
-
-		LOG.info("Creating data sink vertex " + taskName + " with parallelism " + dop);
-		
-		return new FlinkDataSinkVertex(taskName, dop, config);
-	}
-
-	private FlinkVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException, IOException {
-		final String taskName = node.getNodeName();
-		int dop = node.getParallelism();
-
-		final TezTaskConfig config= new TezTaskConfig(new Configuration());
-
-		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
-		config.setStubParameters(node.getProgramOperator().getParameters());
-
-		InputFormat format = node.getDataSourceNode().getOperator().getFormatWrapper().getUserCodeObject();
-
-		config.setInputFormat(format);
-
-		// Create as many data sources as input splits
-		InputSplit[] splits = format.createInputSplits((dop > 0) ? dop : 1);
-		dop = splits.length;
-
-		LOG.info("Creating data source vertex " + taskName + " with parallelism " + dop);
-		
-		return new FlinkDataSourceVertex(taskName, dop, config);
-	}
-
-	private FlinkVertex createUnionVertex(NAryUnionPlanNode node) throws CompilerException, IOException {
-		final String taskName = node.getNodeName();
-		final int dop = node.getParallelism();
-		final TezTaskConfig config= new TezTaskConfig(new Configuration());
-
-		LOG.info("Creating union vertex " + taskName + " with parallelism " + dop);
-		
-		return new FlinkUnionVertex (taskName, dop, config);
-	}
-
-
-	private void assignDriverResources(PlanNode node, TaskConfig config) {
-		final double relativeMem = node.getRelativeMemoryPerSubTask();
-		if (relativeMem > 0) {
-			config.setRelativeMemoryDriver(relativeMem);
-			config.setFilehandlesDriver(this.defaultMaxFan);
-			config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold);
-		}
-	}
-
-	private void assignLocalStrategyResources(Channel c, TaskConfig config, int inputNum) {
-		if (c.getRelativeMemoryLocalStrategy() > 0) {
-			config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy());
-			config.setFilehandlesInput(inputNum, this.defaultMaxFan);
-			config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold);
-		}
-	}
-
-	private int translateChannel(Channel input, int inputIndex, FlinkVertex targetVertex,
-								TezTaskConfig targetVertexConfig, boolean isBroadcast) throws Exception
-	{
-		final PlanNode inputPlanNode = input.getSource();
-		final Iterator<Channel> allInChannels;
-
-
-		allInChannels = Collections.singletonList(input).iterator();
-
-
-		// check that the type serializer is consistent
-		TypeSerializerFactory<?> typeSerFact = null;
-
-		while (allInChannels.hasNext()) {
-			final Channel inConn = allInChannels.next();
-
-			if (typeSerFact == null) {
-				typeSerFact = inConn.getSerializer();
-			} else if (!typeSerFact.equals(inConn.getSerializer())) {
-				throw new CompilerException("Conflicting types in union operator.");
-			}
-
-			final PlanNode sourceNode = inConn.getSource();
-			FlinkVertex sourceVertex = this.vertices.get(sourceNode);
-			TezTaskConfig sourceVertexConfig = sourceVertex.getConfig(); //TODO ??? need to create a new TezConfig ???
-
-			connectJobVertices(
-					inConn, inputIndex, sourceVertex, sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast);
-		}
-
-		// the local strategy is added only once. in non-union case that is the actual edge,
-		// in the union case, it is the edge between union and the target node
-		addLocalInfoFromChannelToConfig(input, targetVertexConfig, inputIndex, isBroadcast);
-		return 1;
-	}
-
-	private void connectJobVertices(Channel channel, int inputNumber,
-							final FlinkVertex sourceVertex, final TezTaskConfig sourceConfig,
-							final FlinkVertex targetVertex, final TezTaskConfig targetConfig, boolean isBroadcast)
-			throws CompilerException {
-
-		// -------------- configure the source task's ship strategy strategies in task config --------------
-		final int outputIndex = sourceConfig.getNumOutputs();
-		sourceConfig.addOutputShipStrategy(channel.getShipStrategy());
-		if (outputIndex == 0) {
-			sourceConfig.setOutputSerializer(channel.getSerializer());
-		}
-		if (channel.getShipStrategyComparator() != null) {
-			sourceConfig.setOutputComparator(channel.getShipStrategyComparator(), outputIndex);
-		}
-
-		if (channel.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) {
-
-			final DataDistribution dataDistribution = channel.getDataDistribution();
-			if(dataDistribution != null) {
-				sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
-			} else {
-				throw new RuntimeException("Range partitioning requires data distribution");
-				// TODO: inject code and configuration for automatic histogram generation
-			}
-		}
-
-		// ---------------- configure the receiver -------------------
-		if (isBroadcast) {
-			targetConfig.addBroadcastInputToGroup(inputNumber);
-		} else {
-			targetConfig.addInputToGroup(inputNumber);
-		}
-
-		//----------------- connect source and target with edge ------------------------------
-
-		FlinkEdge edge;
-		ShipStrategyType shipStrategy = channel.getShipStrategy();
-		TypeSerializer<?> serializer = channel.getSerializer().getSerializer();
-		if ((shipStrategy == ShipStrategyType.FORWARD) || (shipStrategy == ShipStrategyType.NONE)) {
-			edge = new FlinkForwardEdge(sourceVertex, targetVertex, serializer);
-			// For forward edges, create as many tasks in upstream operator as in source operator
-			targetVertex.setParallelism(sourceVertex.getParallelism());
-		}
-		else if (shipStrategy == ShipStrategyType.BROADCAST) {
-			edge = new FlinkBroadcastEdge(sourceVertex, targetVertex, serializer);
-		}
-		else if (shipStrategy == ShipStrategyType.PARTITION_HASH) {
-			edge = new FlinkPartitionEdge(sourceVertex, targetVertex, serializer);
-		}
-		else {
-			throw new CompilerException("Ship strategy between nodes " + sourceVertex.getVertex().getName() + " and " + targetVertex.getVertex().getName() + " currently not supported");
-		}
-
-		// Tez-specific bookkeeping
-		// TODO: This probably will not work for vertices with multiple outputs
-		sourceVertex.addNumberOfSubTasksInOutput(targetVertex.getParallelism(), outputIndex);
-		targetVertex.addInput(sourceVertex, inputNumber);
-
-
-		edges.add(edge);
-	}
-
-	private void addLocalInfoFromChannelToConfig(Channel channel, TaskConfig config, int inputNum, boolean isBroadcastChannel) {
-		// serializer
-		if (isBroadcastChannel) {
-			config.setBroadcastInputSerializer(channel.getSerializer(), inputNum);
-
-			if (channel.getLocalStrategy() != LocalStrategy.NONE || (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE)) {
-				throw new CompilerException("Found local strategy or temp mode on a broadcast variable channel.");
-			} else {
-				return;
-			}
-		} else {
-			config.setInputSerializer(channel.getSerializer(), inputNum);
-		}
-
-		// local strategy
-		if (channel.getLocalStrategy() != LocalStrategy.NONE) {
-			config.setInputLocalStrategy(inputNum, channel.getLocalStrategy());
-			if (channel.getLocalStrategyComparator() != null) {
-				config.setInputComparator(channel.getLocalStrategyComparator(), inputNum);
-			}
-		}
-
-		assignLocalStrategyResources(channel, config, inputNum);
-
-		// materialization / caching
-		if (channel.getTempMode() != null) {
-			final TempMode tm = channel.getTempMode();
-
-			boolean needsMemory = false;
-			if (tm.breaksPipeline()) {
-				config.setInputAsynchronouslyMaterialized(inputNum, true);
-				needsMemory = true;
-			}
-			if (tm.isCached()) {
-				config.setInputCached(inputNum, true);
-				needsMemory = true;
-			}
-
-			if (needsMemory) {
-				// sanity check
-				if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) {
-					throw new CompilerException("Bug in compiler: Inconsistent description of input materialization.");
-				}
-				config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory());
-			}
-		}
-	}
-
-	private boolean containsSelfJoins () {
-		for (FlinkVertex v : vertices.values()) {
-			ArrayList<FlinkVertex> predecessors = new ArrayList<FlinkVertex>();
-			for (FlinkEdge e : edges) {
-				if (e.target == v) {
-					if (predecessors.contains(e.source)) {
-						return true;
-					}
-					predecessors.add(e.source);
-				}
-			}
-		}
-		return false;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
deleted file mode 100644
index 707fd47..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.examples;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-import org.apache.flink.util.Collector;
-
-
-public class ConnectedComponentsStep implements ProgramDescription {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String... args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		// set up execution environment
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// read vertex and edge data
-		DataSet<Long> vertices = getVertexDataSet(env);
-		DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
-
-		// assign the initial components (equal to the vertex id)
-		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
-
-		DataSet<Tuple2<Long,Long>> nextComponenets = verticesWithInitialId
-				.join(edges)
-				.where(0).equalTo(0)
-				.with(new NeighborWithComponentIDJoin())
-				.groupBy(0).aggregate(Aggregations.MIN, 1)
-				.join(verticesWithInitialId)
-				.where(0).equalTo(0)
-				.with(new ComponentIdFilter());
-
-
-		// emit result
-		if(fileOutput) {
-			nextComponenets.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			nextComponenets.print();
-		}
-
-		// execute program
-		env.execute("Connected Components Example");
-	}
-
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Function that turns a value into a 2-tuple where both fields are that value.
-	 */
-	@ForwardedFields("*->f0")
-	public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
-
-		@Override
-		public Tuple2<T, T> map(T vertex) {
-			return new Tuple2<T, T>(vertex, vertex);
-		}
-	}
-
-	/**
-	 * Undirected edges by emitting for each input edge the input edges itself and an inverted version.
-	 */
-	public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-		Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
-
-		@Override
-		public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
-			invertedEdge.f0 = edge.f1;
-			invertedEdge.f1 = edge.f0;
-			out.collect(edge);
-			out.collect(invertedEdge);
-		}
-	}
-
-	/**
-	 * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
-	 * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
-	 * produces a (Target-vertex-ID, Component-ID) pair.
-	 */
-	@ForwardedFieldsFirst("f1->f1")
-	@ForwardedFieldsSecond("f1->f0")
-	public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
-			return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
-		}
-	}
-
-
-
-	@ForwardedFieldsFirst("*")
-	public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
-			if (candidate.f1 < old.f1) {
-				out.collect(candidate);
-			}
-		}
-	}
-
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>";
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String verticesPath = null;
-	private static String edgesPath = null;
-	private static String outputPath = null;
-	private static int maxIterations = 10;
-
-	private static boolean parseParameters(String[] programArguments) {
-
-		if(programArguments.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(programArguments.length == 4) {
-				verticesPath = programArguments[0];
-				edgesPath = programArguments[1];
-				outputPath = programArguments[2];
-				maxIterations = Integer.parseInt(programArguments[3]);
-			} else {
-				System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing Connected Components example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
-		}
-		return true;
-	}
-
-	private static DataSet<Long> getVertexDataSet(ExecutionEnvironment env) {
-
-		if(fileOutput) {
-			return env.readCsvFile(verticesPath).types(Long.class)
-					.map(
-							new MapFunction<Tuple1<Long>, Long>() {
-								public Long map(Tuple1<Long> value) { return value.f0; }
-							});
-		} else {
-			return ConnectedComponentsData.getDefaultVertexDataSet(env);
-		}
-	}
-
-	private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
-
-		if(fileOutput) {
-			return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
-		} else {
-			return ConnectedComponentsData.getDefaultEdgeDataSet(env);
-		}
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
deleted file mode 100644
index c65fb69..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.examples;
-
-import org.apache.hadoop.util.ProgramDriver;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.client.VertexStatus;
-
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.EnumSet;
-import java.util.Set;
-
-public class ExampleDriver {
-
-	private static final DecimalFormat formatter = new DecimalFormat("###.##%");
-
-	public static void main(String [] args){
-		int exitCode = -1;
-		ProgramDriver pgd = new ProgramDriver();
-		try {
-			pgd.addClass("wc", WordCount.class,
-					"Wordcount");
-			pgd.addClass("tpch3", TPCHQuery3.class,
-					"Modified TPC-H 3 query");
-			pgd.addClass("tc", TransitiveClosureNaiveStep.class,
-					"One step of transitive closure");
-			pgd.addClass("pr", PageRankBasicStep.class,
-					"One step of PageRank");
-			pgd.addClass("cc", ConnectedComponentsStep.class,
-					"One step of connected components");
-			exitCode = pgd.run(args);
-		} catch(Throwable e){
-			e.printStackTrace();
-		}
-		System.exit(exitCode);
-	}
-
-	public static void printDAGStatus(DAGClient dagClient, String[] vertexNames)
-			throws IOException, TezException {
-		printDAGStatus(dagClient, vertexNames, false, false);
-	}
-
-	public static void printDAGStatus(DAGClient dagClient, String[] vertexNames, boolean displayDAGCounters, boolean displayVertexCounters)
-			throws IOException, TezException {
-		Set<StatusGetOpts> opts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-		DAGStatus dagStatus = dagClient.getDAGStatus(
-				(displayDAGCounters ? opts : null));
-		Progress progress = dagStatus.getDAGProgress();
-		double vProgressFloat = 0.0f;
-		if (progress != null) {
-			System.out.println("");
-			System.out.println("DAG: State: "
-					+ dagStatus.getState()
-					+ " Progress: "
-					+ (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) :
-					formatter.format((double)(progress.getSucceededTaskCount())
-							/progress.getTotalTaskCount())));
-			for (String vertexName : vertexNames) {
-				VertexStatus vStatus = dagClient.getVertexStatus(vertexName,
-						(displayVertexCounters ? opts : null));
-				if (vStatus == null) {
-					System.out.println("Could not retrieve status for vertex: "
-							+ vertexName);
-					continue;
-				}
-				Progress vProgress = vStatus.getProgress();
-				if (vProgress != null) {
-					vProgressFloat = 0.0f;
-					if (vProgress.getTotalTaskCount() == 0) {
-						vProgressFloat = 1.0f;
-					} else if (vProgress.getTotalTaskCount() > 0) {
-						vProgressFloat = (double)vProgress.getSucceededTaskCount()
-								/vProgress.getTotalTaskCount();
-					}
-					System.out.println("VertexStatus:"
-							+ " VertexName: "
-							+ (vertexName.equals("ivertex1") ? "intermediate-reducer"
-							: vertexName)
-							+ " Progress: " + formatter.format(vProgressFloat));
-				}
-				if (displayVertexCounters) {
-					TezCounters counters = vStatus.getVertexCounters();
-					if (counters != null) {
-						System.out.println("Vertex Counters for " + vertexName + ": "
-								+ counters);
-					}
-				}
-			}
-		}
-		if (displayDAGCounters) {
-			TezCounters counters = dagStatus.getDAGCounters();
-			if (counters != null) {
-				System.out.println("DAG Counters: " + counters);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
deleted file mode 100644
index 031893d..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.examples;
-
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.graph.util.PageRankData;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
-
-public class PageRankBasicStep {
-
-	private static final double DAMPENING_FACTOR = 0.85;
-	private static final double EPSILON = 0.0001;
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		// set up execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataSet<Long> pagesInput = getPagesDataSet(env);
-		DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env);
-
-		// assign initial rank to pages
-		DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
-				map(new RankAssigner((1.0d / numPages)));
-
-		// build adjacency list from link input
-		DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
-				linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
-
-		DataSet<Tuple2<Long, Double>> newRanks = pagesWithRanks
-				.join(adjacencyListInput).where(0).equalTo(0)
-				.flatMap(new JoinVertexWithEdgesMatch())
-				.groupBy(0).aggregate(SUM, 1)
-				.map(new Dampener(DAMPENING_FACTOR, numPages));
-
-
-		// emit result
-		if(fileOutput) {
-			newRanks.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			newRanks.print();
-		}
-
-		// execute program
-		env.execute("Basic Page Rank Example");
-
-	}
-
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * A map function that assigns an initial rank to all pages.
-	 */
-	public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {
-		Tuple2<Long, Double> outPageWithRank;
-
-		public RankAssigner(double rank) {
-			this.outPageWithRank = new Tuple2<Long, Double>(-1l, rank);
-		}
-
-		@Override
-		public Tuple2<Long, Double> map(Long page) {
-			outPageWithRank.f0 = page;
-			return outPageWithRank;
-		}
-	}
-
-	/**
-	 * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
-	 * originate. Run as a pre-processing step.
-	 */
-	@ForwardedFields("0")
-	public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
-
-		private final ArrayList<Long> neighbors = new ArrayList<Long>();
-
-		@Override
-		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
-			neighbors.clear();
-			Long id = 0L;
-
-			for (Tuple2<Long, Long> n : values) {
-				id = n.f0;
-				neighbors.add(n.f1);
-			}
-			out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
-		}
-	}
-
-	/**
-	 * Join function that distributes a fraction of a vertex's rank to all neighbors.
-	 */
-	public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> {
-
-		@Override
-		public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){
-			Long[] neigbors = value.f1.f1;
-			double rank = value.f0.f1;
-			double rankToDistribute = rank / ((double) neigbors.length);
-
-			for (int i = 0; i < neigbors.length; i++) {
-				out.collect(new Tuple2<Long, Double>(neigbors[i], rankToDistribute));
-			}
-		}
-	}
-
-	/**
-	 * The function that applies the page rank dampening formula
-	 */
-	@ForwardedFields("0")
-	public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
-
-		private final double dampening;
-		private final double randomJump;
-
-		public Dampener(double dampening, double numVertices) {
-			this.dampening = dampening;
-			this.randomJump = (1 - dampening) / numVertices;
-		}
-
-		@Override
-		public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
-			value.f1 = (value.f1 * dampening) + randomJump;
-			return value;
-		}
-	}
-
-	/**
-	 * Filter that filters vertices where the rank difference is below a threshold.
-	 */
-	public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
-
-		@Override
-		public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
-			return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
-		}
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String pagesInputPath = null;
-	private static String linksInputPath = null;
-	private static String outputPath = null;
-	private static long numPages = 0;
-	private static int maxIterations = 10;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			if(args.length == 5) {
-				fileOutput = true;
-				pagesInputPath = args[0];
-				linksInputPath = args[1];
-				outputPath = args[2];
-				numPages = Integer.parseInt(args[3]);
-				maxIterations = Integer.parseInt(args[4]);
-			} else {
-				System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing PageRank Basic example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
-
-			numPages = PageRankData.getNumberOfPages();
-		}
-		return true;
-	}
-
-	private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env
-					.readCsvFile(pagesInputPath)
-					.fieldDelimiter(' ')
-					.lineDelimiter("\n")
-					.types(Long.class)
-					.map(new MapFunction<Tuple1<Long>, Long>() {
-						@Override
-						public Long map(Tuple1<Long> v) { return v.f0; }
-					});
-		} else {
-			return PageRankData.getDefaultPagesDataSet(env);
-		}
-	}
-
-	private static DataSet<Tuple2<Long, Long>> getLinksDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env.readCsvFile(linksInputPath)
-					.fieldDelimiter(' ')
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class);
-		} else {
-			return PageRankData.getDefaultEdgeDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
deleted file mode 100644
index d61f80e..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.examples;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.tez.client.RemoteTezEnvironment;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-public class TPCHQuery3 {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		final RemoteTezEnvironment env = RemoteTezEnvironment.create();
-		env.setParallelism(400);
-
-
-		// get input data
-		DataSet<Lineitem> lineitems = getLineitemDataSet(env);
-		DataSet<Order> orders = getOrdersDataSet(env);
-		DataSet<Customer> customers = getCustomerDataSet(env);
-
-		// Filter market segment "AUTOMOBILE"
-		customers = customers.filter(
-				new FilterFunction<Customer>() {
-					@Override
-					public boolean filter(Customer c) {
-						return c.getMktsegment().equals("AUTOMOBILE");
-					}
-				});
-
-		// Filter all Orders with o_orderdate < 12.03.1995
-		orders = orders.filter(
-				new FilterFunction<Order>() {
-					private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-					private final Date date = format.parse("1995-03-12");
-
-					@Override
-					public boolean filter(Order o) throws ParseException {
-						return format.parse(o.getOrderdate()).before(date);
-					}
-				});
-
-		// Filter all Lineitems with l_shipdate > 12.03.1995
-		lineitems = lineitems.filter(
-				new FilterFunction<Lineitem>() {
-					private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-					private final Date date = format.parse("1995-03-12");
-
-					@Override
-					public boolean filter(Lineitem l) throws ParseException {
-						return format.parse(l.getShipdate()).after(date);
-					}
-				});
-
-		// Join customers with orders and package them into a ShippingPriorityItem
-		DataSet<ShippingPriorityItem> customerWithOrders =
-				customers.join(orders).where(0).equalTo(1)
-						.with(
-								new JoinFunction<Customer, Order, ShippingPriorityItem>() {
-									@Override
-									public ShippingPriorityItem join(Customer c, Order o) {
-										return new ShippingPriorityItem(o.getOrderKey(), 0.0, o.getOrderdate(),
-												o.getShippriority());
-									}
-								});
-
-		// Join the last join result with Lineitems
-		DataSet<ShippingPriorityItem> result =
-				customerWithOrders.join(lineitems).where(0).equalTo(0)
-						.with(
-								new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
-									@Override
-									public ShippingPriorityItem join(ShippingPriorityItem i, Lineitem l) {
-										i.setRevenue(l.getExtendedprice() * (1 - l.getDiscount()));
-										return i;
-									}
-								})
-								// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
-						.groupBy(0, 2, 3)
-						.aggregate(Aggregations.SUM, 1);
-
-		// emit result
-		result.writeAsCsv(outputPath, "\n", "|");
-
-		// execute program
-		env.registerMainClass(TPCHQuery3.class);
-		env.execute("TPCH Query 3 Example");
-
-	}
-
-	// *************************************************************************
-	//     DATA TYPES
-	// *************************************************************************
-
-	public static class Lineitem extends Tuple4<Integer, Double, Double, String> {
-
-		public Integer getOrderkey() { return this.f0; }
-		public Double getDiscount() { return this.f2; }
-		public Double getExtendedprice() { return this.f1; }
-		public String getShipdate() { return this.f3; }
-	}
-
-	public static class Customer extends Tuple2<Integer, String> {
-
-		public Integer getCustKey() { return this.f0; }
-		public String getMktsegment() { return this.f1; }
-	}
-
-	public static class Order extends Tuple4<Integer, Integer, String, Integer> {
-
-		public Integer getOrderKey() { return this.f0; }
-		public Integer getCustKey() { return this.f1; }
-		public String getOrderdate() { return this.f2; }
-		public Integer getShippriority() { return this.f3; }
-	}
-
-	public static class ShippingPriorityItem extends Tuple4<Integer, Double, String, Integer> {
-
-		public ShippingPriorityItem() { }
-
-		public ShippingPriorityItem(Integer o_orderkey, Double revenue,
-									String o_orderdate, Integer o_shippriority) {
-			this.f0 = o_orderkey;
-			this.f1 = revenue;
-			this.f2 = o_orderdate;
-			this.f3 = o_shippriority;
-		}
-
-		public Integer getOrderkey() { return this.f0; }
-		public void setOrderkey(Integer orderkey) { this.f0 = orderkey; }
-		public Double getRevenue() { return this.f1; }
-		public void setRevenue(Double revenue) { this.f1 = revenue; }
-
-		public String getOrderdate() { return this.f2; }
-		public Integer getShippriority() { return this.f3; }
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static String lineitemPath;
-	private static String customerPath;
-	private static String ordersPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] programArguments) {
-
-		if(programArguments.length > 0) {
-			if(programArguments.length == 4) {
-				lineitemPath = programArguments[0];
-				customerPath = programArguments[1];
-				ordersPath = programArguments[2];
-				outputPath = programArguments[3];
-			} else {
-				System.err.println("Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
-				return false;
-			}
-		} else {
-			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-					"  Due to legal restrictions, we can not ship generated data.\n" +
-					"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
-					"  Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
-			return false;
-		}
-		return true;
-	}
-
-	private static DataSet<Lineitem> getLineitemDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(lineitemPath)
-				.fieldDelimiter('|')
-				.includeFields("1000011000100000")
-				.tupleType(Lineitem.class);
-	}
-
-	private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(customerPath)
-				.fieldDelimiter('|')
-				.includeFields("10000010")
-				.tupleType(Customer.class);
-	}
-
-	private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(ordersPath)
-				.fieldDelimiter('|')
-				.includeFields("110010010")
-				.tupleType(Order.class);
-	}
-
-}


Mime
View raw message