flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [16/50] flink git commit: [FLINK-1712] Remove "flink-staging" module
Date Thu, 14 Jan 2016 16:16:13 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java
deleted file mode 100644
index b014c3e..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.examples;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-import org.apache.flink.util.Collector;
-
-
-/*
- * NOTE:
- * This program is currently supposed to throw a Compiler Exception due to TEZ-1190
- */
-
-public class TransitiveClosureNaiveStep implements ProgramDescription {
-
-
-	public static void main (String... args) throws Exception{
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up execution environment
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env);
-
-		DataSet<Tuple2<Long,Long>> nextPaths = edges
-				.join(edges)
-				.where(1)
-				.equalTo(0)
-				.with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
-					@Override
-					/**
-					 left: Path (z,x) - x is reachable by z
-					 right: Edge (x,y) - edge x-->y exists
-					 out: Path (z,y) - y is reachable by z
-					 */
-					public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
-						return new Tuple2<Long, Long>(
-								new Long(left.f0),
-								new Long(right.f1));
-					}
-				})
-				.union(edges)
-				.groupBy(0, 1)
-				.reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
-					@Override
-					public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
-						out.collect(values.iterator().next());
-					}
-				});
-
-		// emit result
-		if (fileOutput) {
-			nextPaths.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			nextPaths.print();
-		}
-
-		// execute program
-		env.execute("Transitive Closure Example");
-
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: <edges-path> <result-path> <max-number-of-iterations>";
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String edgesPath = null;
-	private static String outputPath = null;
-	private static int maxIterations = 10;
-
-	private static boolean parseParameters(String[] programArguments) {
-
-		if (programArguments.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (programArguments.length == 3) {
-				edgesPath = programArguments[0];
-				outputPath = programArguments[1];
-				maxIterations = Integer.parseInt(programArguments[2]);
-			} else {
-				System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
-		}
-		return true;
-	}
-
-
-	private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
-
-		if(fileOutput) {
-			return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
-		} else {
-			return ConnectedComponentsData.getDefaultEdgeDataSet(env);
-		}
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java
deleted file mode 100644
index e758156..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.examples;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.tez.client.RemoteTezEnvironment;
-import org.apache.flink.util.Collector;
-
-public class WordCount {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final RemoteTezEnvironment env = RemoteTezEnvironment.create();
-		env.setParallelism(8);
-
-		// get input data
-		DataSet<String> text = getTextDataSet(env);
-
-		DataSet<Tuple2<String, Integer>> counts =
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new Tokenizer())
-						// group by the tuple field "0" and sum up tuple field "1"
-						.groupBy(0)
-						.sum(1);
-
-		// emit result
-		if(fileOutput) {
-			counts.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.registerMainClass(WordCount.class);
-		env.execute("WordCount Example");
-	}
-
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a user-defined
-	 * FlatMapFunction. The function takes a line (String) and splits it into
-	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
-	 */
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return WordCountData.getDefaultTextLineDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
deleted file mode 100644
index 8011d21..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
-import org.apache.flink.runtime.operators.util.CloseableInputProvider;
-import org.apache.flink.tez.runtime.input.TezReaderIterator;
-import org.apache.flink.tez.util.DummyInvokable;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class DataSinkProcessor<IT> extends AbstractLogicalIOProcessor {
-
-	// Tez stuff
-	private TezTaskConfig config;
-	protected Map<String, LogicalInput> inputs;
-	private List<KeyValueReader> readers;
-	private int numInputs;
-	private TezRuntimeEnvironment runtimeEnvironment;
-	AbstractInvokable invokable = new DummyInvokable();
-
-	// Flink stuff
-	private OutputFormat<IT> format;
-	private ClassLoader userCodeClassLoader = this.getClass().getClassLoader();
-	private CloseableInputProvider<IT> localStrategy;
-	// input reader
-	private MutableObjectIterator<IT> reader;
-	// input iterator
-	private MutableObjectIterator<IT> input;
-	private TypeSerializerFactory<IT> inputTypeSerializerFactory;
-
-
-
-
-	public DataSinkProcessor(ProcessorContext context) {
-		super(context);
-	}
-
-	@Override
-	public void initialize() throws Exception {
-		UserPayload payload = getContext().getUserPayload();
-		Configuration conf = TezUtils.createConfFromUserPayload(payload);
-
-		this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
-		config.setTaskName(getContext().getTaskVertexName());
-
-		this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 * this.getContext().getTotalMemoryAvailableToTask()));
-
-		this.inputTypeSerializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
-
-		initOutputFormat();
-	}
-
-	@Override
-	public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
-
-		Preconditions.checkArgument((outputs == null) || (outputs.size() == 0));
-		Preconditions.checkArgument(inputs.size() == 1);
-
-		this.inputs = inputs;
-		this.numInputs = inputs.size();
-		this.readers = new ArrayList<KeyValueReader>(numInputs);
-		if (this.inputs != null) {
-			for (LogicalInput input: this.inputs.values()) {
-				//if (input instanceof AbstractLogicalInput) {
-				//	((AbstractLogicalInput) input).initialize();
-				//}
-				input.start();
-				readers.add((KeyValueReader) input.getReader());
-			}
-		}
-
-		this.reader = new TezReaderIterator<IT>(readers.get(0));
-
-		this.invoke();
-	}
-
-	@Override
-	public void handleEvents(List<Event> processorEvents) {
-
-	}
-
-	@Override
-	public void close() throws Exception {
-		this.runtimeEnvironment.getIOManager().shutdown();
-	}
-
-	private void invoke () {
-		try {
-			// initialize local strategies
-			switch (this.config.getInputLocalStrategy(0)) {
-				case NONE:
-					// nothing to do
-					localStrategy = null;
-					input = reader;
-					break;
-				case SORT:
-					// initialize sort local strategy
-					try {
-						// get type comparator
-						TypeComparatorFactory<IT> compFact = this.config.getInputComparator(0, this.userCodeClassLoader);
-						if (compFact == null) {
-							throw new Exception("Missing comparator factory for local strategy on input " + 0);
-						}
-
-						// initialize sorter
-						UnilateralSortMerger<IT> sorter = new UnilateralSortMerger<IT>(
-								this.runtimeEnvironment.getMemoryManager(),
-								this.runtimeEnvironment.getIOManager(),
-								this.reader, this.invokable, this.inputTypeSerializerFactory, compFact.createComparator(),
-								this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
-								this.config.getSpillingThresholdInput(0), false);
-
-						this.localStrategy = sorter;
-						this.input = sorter.getIterator();
-					} catch (Exception e) {
-						throw new RuntimeException("Initializing the input processing failed" +
-								e.getMessage() == null ? "." : ": " + e.getMessage(), e);
-					}
-					break;
-				default:
-					throw new RuntimeException("Invalid local strategy for DataSinkTask");
-			}
-
-			final TypeSerializer<IT> serializer = this.inputTypeSerializerFactory.getSerializer();
-			final MutableObjectIterator<IT> input = this.input;
-			final OutputFormat<IT> format = this.format;
-
-
-			IT record = serializer.createInstance();
-			format.open (this.getContext().getTaskIndex(), this.getContext().getVertexParallelism());
-
-			// work!
-			while (((record = input.next(record)) != null)) {
-				format.writeRecord(record);
-			}
-
-			this.format.close();
-			this.format = null;
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			throw new RuntimeException();
-		}
-		finally {
-			if (this.format != null) {
-				// close format, if it has not been closed, yet.
-				// This should only be the case if we had a previous error, or were canceled.
-				try {
-					this.format.close();
-				}
-				catch (Throwable t) {
-					//TODO log warning message
-				}
-			}
-			// close local strategy if necessary
-			if (localStrategy != null) {
-				try {
-					this.localStrategy.close();
-				} catch (Throwable t) {
-					//TODO log warning message
-				}
-			}
-		}
-	}
-
-	private void initOutputFormat () {
-		try {
-			this.format = this.config.<OutputFormat<IT>>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(OutputFormat.class, this.userCodeClassLoader);
-
-			// check if the class is a subclass, if the check is required
-			if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
-				throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
-						OutputFormat.class.getName() + "' as is required.");
-			}
-		}
-		catch (ClassCastException ccex) {
-			throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), ccex);
-		}
-
-		// configure the stub. catch exceptions here extra, to report them as originating from the user code
-		try {
-			this.format.configure(this.config.getStubParameters());
-		}
-		catch (Throwable t) {
-			throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: "
-					+ t.getMessage(), t);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java
deleted file mode 100644
index dd3f843..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.tez.runtime.input.FlinkInput;
-import org.apache.flink.tez.runtime.output.TezChannelSelector;
-import org.apache.flink.tez.runtime.output.TezOutputCollector;
-import org.apache.flink.tez.runtime.output.TezOutputEmitter;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class DataSourceProcessor<OT> extends AbstractLogicalIOProcessor {
-
-	private TezTaskConfig config;
-	protected Map<String, LogicalOutput> outputs;
-	private List<KeyValueWriter> writers;
-	private int numOutputs;
-	private Collector<OT> collector;
-
-	private InputFormat<OT, InputSplit> format;
-	private TypeSerializerFactory<OT> serializerFactory;
-	private FlinkInput input;
-	private ClassLoader userCodeClassLoader = getClass().getClassLoader();
-
-
-	public DataSourceProcessor(ProcessorContext context) {
-		super(context);
-	}
-
-	@Override
-	public void initialize() throws Exception {
-		UserPayload payload = getContext().getUserPayload();
-		Configuration conf = TezUtils.createConfFromUserPayload(payload);
-
-		this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
-		config.setTaskName(getContext().getTaskVertexName());
-
-		this.serializerFactory = config.getOutputSerializer(this.userCodeClassLoader);
-
-		initInputFormat();
-	}
-
-	@Override
-	public void handleEvents(List<Event> processorEvents) {
-		int i = 0;
-	}
-
-	@Override
-	public void close() throws Exception {
-
-	}
-
-	@Override
-	public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
-
-		Preconditions.checkArgument(inputs.size() == 1);
-		LogicalInput logicalInput = inputs.values().iterator().next();
-		if (!(logicalInput instanceof FlinkInput)) {
-			throw new RuntimeException("Input to Flink Data Source Processor should be of type FlinkInput");
-		}
-		this.input = (FlinkInput) logicalInput;
-		//this.reader = (KeyValueReader) input.getReader();
-
-		// Initialize inputs, get readers and writers
-		this.outputs = outputs;
-		this.numOutputs = outputs.size();
-		this.writers = new ArrayList<KeyValueWriter>(numOutputs);
-		if (this.outputs != null) {
-			for (LogicalOutput output : this.outputs.values()) {
-				output.start();
-				writers.add((KeyValueWriter) output.getWriter());
-			}
-		}
-		this.invoke();
-	}
-
-
-	private void invoke () {
-		final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
-		try {
-			InputSplit split = input.getSplit();
-
-			OT record = serializer.createInstance();
-			final InputFormat<OT, InputSplit> format = this.format;
-			format.open(split);
-
-			int numOutputs = outputs.size();
-			ArrayList<TezChannelSelector<OT>> channelSelectors = new ArrayList<TezChannelSelector<OT>>(numOutputs);
-			ArrayList<Integer> numStreamsInOutputs = this.config.getNumberSubtasksInOutput();
-			for (int i = 0; i < numOutputs; i++) {
-				final ShipStrategyType strategy = config.getOutputShipStrategy(i);
-				final TypeComparatorFactory<OT> compFactory = config.getOutputComparator(i, this.userCodeClassLoader);
-				final DataDistribution dataDist = config.getOutputDataDistribution(i, this.userCodeClassLoader);
-				if (compFactory == null) {
-					channelSelectors.add(i, new TezOutputEmitter<OT>(strategy));
-				} else if (dataDist == null){
-					final TypeComparator<OT> comparator = compFactory.createComparator();
-					channelSelectors.add(i, new TezOutputEmitter<OT>(strategy, comparator));
-				} else {
-					final TypeComparator<OT> comparator = compFactory.createComparator();
-					channelSelectors.add(i,new TezOutputEmitter<OT>(strategy, comparator, dataDist));
-				}
-			}
-			collector = new TezOutputCollector<OT>(writers, channelSelectors, serializerFactory.getSerializer(), numStreamsInOutputs);
-
-			while (!format.reachedEnd()) {
-				// build next pair and ship pair if it is valid
-				if ((record = format.nextRecord(record)) != null) {
-					collector.collect(record);
-				}
-			}
-			format.close();
-
-			collector.close();
-
-		}
-		catch (Exception ex) {
-			// close the input, but do not report any exceptions, since we already have another root cause
-			try {
-				this.format.close();
-			} catch (Throwable t) {}
-		}
-	}
-
-
-	private void initInputFormat() {
-		try {
-			this.format = config.<InputFormat<OT, InputSplit>>getStubWrapper(this.userCodeClassLoader)
-					.getUserCodeObject(InputFormat.class, this.userCodeClassLoader);
-
-			// check if the class is a subclass, if the check is required
-			if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
-				throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
-						InputFormat.class.getName() + "' as is required.");
-			}
-		}
-		catch (ClassCastException ccex) {
-			throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(),
-					ccex);
-		}
-		// configure the stub. catch exceptions here extra, to report them as originating from the user code
-		try {
-			this.format.configure(this.config.getStubParameters());
-		}
-		catch (Throwable t) {
-			throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t);
-		}
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
deleted file mode 100644
index 14d9cde..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.operators.Driver;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-
-public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOProcessor {
-
-	private TezTask<S,OT> task;
-	protected Map<String, LogicalInput> inputs;
-	protected Map<String, LogicalOutput> outputs;
-	private List<KeyValueReader> readers;
-	private List<KeyValueWriter> writers;
-	private int numInputs;
-	private int numOutputs;
-
-
-	public RegularProcessor(ProcessorContext context) {
-		super(context);
-	}
-
-	@Override
-	public void initialize() throws Exception {
-		UserPayload payload = getContext().getUserPayload();
-		Configuration conf = TezUtils.createConfFromUserPayload(payload);
-
-		TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
-		taskConfig.setTaskName(getContext().getTaskVertexName());
-
-		RuntimeUDFContext runtimeUdfContext = new RuntimeUDFContext(
-				new TaskInfo(
-						getContext().getTaskVertexName(),
-						getContext().getTaskIndex(),
-						getContext().getVertexParallelism(),
-						getContext().getTaskAttemptNumber()
-				),
-				getClass().getClassLoader(),
-				new ExecutionConfig(),
-				new HashMap<String, Future<Path>>(),
-				new HashMap<String, Accumulator<?, ?>>());
-
-		this.task = new TezTask<S, OT>(taskConfig, runtimeUdfContext, this.getContext().getTotalMemoryAvailableToTask());
-	}
-
-	@Override
-	public void handleEvents(List<Event> processorEvents) {
-
-	}
-
-	@Override
-	public void close() throws Exception {
-		task.getIOManager().shutdown();
-	}
-
-	@Override
-	public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
-
-		this.inputs = inputs;
-		this.outputs = outputs;
-		final Class<? extends Driver<S, OT>> driverClass = this.task.getTaskConfig().getDriver();
-		Driver<S,OT> driver = InstantiationUtil.instantiate(driverClass, Driver.class);
-		this.numInputs = driver.getNumberOfInputs();
-		this.numOutputs = outputs.size();
-
-
-		this.readers = new ArrayList<KeyValueReader>(numInputs);
-		//Ensure size of list is = numInputs
-		for (int i = 0; i < numInputs; i++)
-			this.readers.add(null);
-		HashMap<String, ArrayList<Integer>> inputPositions = ((TezTaskConfig) this.task.getTaskConfig()).getInputPositions();
-		if (this.inputs != null) {
-			for (String name : this.inputs.keySet()) {
-				LogicalInput input = this.inputs.get(name);
-				//if (input instanceof AbstractLogicalInput) {
-				//	((AbstractLogicalInput) input).initialize();
-				//}
-				input.start();
-				ArrayList<Integer> positions = inputPositions.get(name);
-				for (Integer pos : positions) {
-					//int pos = inputPositions.get(name);
-					readers.set(pos, (KeyValueReader) input.getReader());
-				}
-			}
-		}
-
-		this.writers = new ArrayList<KeyValueWriter>(numOutputs);
-		if (this.outputs != null) {
-			for (LogicalOutput output : this.outputs.values()) {
-				output.start();
-				writers.add((KeyValueWriter) output.getWriter());
-			}
-		}
-
-		// Do the work
-		task.invoke (readers, writers);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
deleted file mode 100644
index b61a9b6..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.memory.MemoryManager;
-
-public class TezRuntimeEnvironment {
-	
-	private final IOManager ioManager;
-
-	private final MemoryManager memoryManager;
-
-	public TezRuntimeEnvironment(long totalMemory) {
-		this.memoryManager = new MemoryManager(totalMemory, 1, MemoryManager.DEFAULT_PAGE_SIZE, MemoryType.HEAP, true);
-		this.ioManager = new IOManagerAsync();
-	}
-
-	public IOManager getIOManager() {
-		return ioManager;
-	}
-
-	public MemoryManager getMemoryManager() {
-		return memoryManager;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
deleted file mode 100644
index 89e4642..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.Driver;
-import org.apache.flink.runtime.operators.TaskContext;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
-import org.apache.flink.runtime.operators.util.CloseableInputProvider;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.tez.runtime.input.TezReaderIterator;
-import org.apache.flink.tez.runtime.output.TezChannelSelector;
-import org.apache.flink.tez.runtime.output.TezOutputEmitter;
-import org.apache.flink.tez.runtime.output.TezOutputCollector;
-import org.apache.flink.tez.util.DummyInvokable;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.MutableObjectIterator;
-
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.io.IOException;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-
-public class TezTask<S extends Function,OT>  implements TaskContext<S, OT> {
-
-	protected static final Log LOG = LogFactory.getLog(TezTask.class);
-
-	DummyInvokable invokable = new DummyInvokable();
-
-	/**
-	 * The driver that invokes the user code (the stub implementation). The central driver in this task
-	 * (further drivers may be chained behind this driver).
-	 */
-	protected volatile Driver<S, OT> driver;
-
-	/**
-	 * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf.
-	 */
-	protected S stub;
-
-	/**
-	 * The udf's runtime context.
-	 */
-	protected RuntimeUDFContext runtimeUdfContext;
-
-	/**
-	 * The collector that forwards the user code's results. May forward to a channel or to chained drivers within
-	 * this task.
-	 */
-	protected Collector<OT> output;
-
-	/**
-	 * The inputs reader, wrapped in an iterator. Prior to the local strategies, etc...
-	 */
-	protected MutableObjectIterator<?>[] inputIterators;
-
-	/**
-	 * The local strategies that are applied on the inputs.
-	 */
-	protected volatile CloseableInputProvider<?>[] localStrategies;
-
-	/**
-	 * The inputs to the operator. Return the readers' data after the application of the local strategy
-	 * and the temp-table barrier.
-	 */
-	protected MutableObjectIterator<?>[] inputs;
-
-	/**
-	 * The serializers for the input data type.
-	 */
-	protected TypeSerializerFactory<?>[] inputSerializers;
-
-	/**
-	 * The comparators for the central driver.
-	 */
-	protected TypeComparator<?>[] inputComparators;
-
-	/**
-	 * The task configuration with the setup parameters.
-	 */
-	protected TezTaskConfig config;
-
-	/**
-	 * The class loader used to instantiate user code and user data types.
-	 */
-	protected ClassLoader userCodeClassLoader = ClassLoader.getSystemClassLoader();
-
-	/**
-	 * For now, create a default ExecutionConfig 
-	 */
-	protected ExecutionConfig executionConfig;
-
-	/*
-	 * Tez-specific variables given by the Processor
-	 */
-	protected TypeSerializer<OT> outSerializer;
-
-	protected List<Integer> numberOfSubTasksInOutputs;
-
-	protected String taskName;
-
-	protected int numberOfSubtasks;
-
-	protected int indexInSubtaskGroup;
-
-	TezRuntimeEnvironment runtimeEnvironment;
-
-	public TezTask(TezTaskConfig config, RuntimeUDFContext runtimeUdfContext, long availableMemory) {
-		this.config = config;
-		final Class<? extends Driver<S, OT>> driverClass = this.config.getDriver();
-		this.driver = InstantiationUtil.instantiate(driverClass, Driver.class);
-		
-		LOG.info("ClassLoader URLs: " + Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs()));
-		
-		this.stub = this.config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(Function.class, this.userCodeClassLoader); //TODO get superclass properly
-		this.runtimeUdfContext = runtimeUdfContext;
-		this.outSerializer = (TypeSerializer<OT>) this.config.getOutputSerializer(getClass().getClassLoader()).getSerializer();
-		this.numberOfSubTasksInOutputs = this.config.getNumberSubtasksInOutput();
-		this.taskName = this.config.getTaskName();
-		this.numberOfSubtasks = this.runtimeUdfContext.getNumberOfParallelSubtasks();
-		this.indexInSubtaskGroup = this.runtimeUdfContext.getIndexOfThisSubtask();
-		this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 * availableMemory));
-		this.executionConfig = runtimeUdfContext.getExecutionConfig();
-		this.invokable.setExecutionConfig(this.executionConfig);
-	}
-
-
-	//-------------------------------------------------------------
-	// Interface to FlinkProcessor
-	//-------------------------------------------------------------
-
-	public void invoke(List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception {
-
-		// whatever happens in this scope, make sure that the local strategies are cleaned up!
-		// note that the initialization of the local strategies is in the try-finally block as well,
-		// so that the thread that creates them catches its own errors that may happen in that process.
-		// this is especially important, since there may be asynchronous closes (such as through canceling).
-		try {
-			// initialize the inputs and outputs
-			initInputsOutputs(readers, writers);
-
-			// pre main-function initialization
-			initialize();
-
-			// the work goes here
-			run();
-		}
-		finally {
-			// clean up in any case!
-			closeLocalStrategies();
-		}
-	}
-
-
-	/*
-	 * Initialize inputs, input serializers, input comparators, and collector
-	 * Assumes that the config and userCodeClassLoader has been set
-	 */
-	private void initInputsOutputs (List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception {
-
-		int numInputs = readers.size();
-		Preconditions.checkArgument(numInputs == driver.getNumberOfInputs());
-
-		// Prior to local strategies
-		this.inputIterators = new MutableObjectIterator[numInputs];
-		//local strategies
-		this.localStrategies = new CloseableInputProvider[numInputs];
-		// After local strategies
-		this.inputs = new MutableObjectIterator[numInputs];
-
-		int numComparators = driver.getNumberOfDriverComparators();
-		initInputsSerializersAndComparators(numInputs, numComparators);
-
-		int index = 0;
-		for (KeyValueReader reader : readers) {
-			this.inputIterators[index] = new TezReaderIterator<Object>(reader);
-			initInputLocalStrategy(index);
-			index++;
-		}
-
-		int numOutputs = writers.size();
-		ArrayList<TezChannelSelector<OT>> channelSelectors = new ArrayList<TezChannelSelector<OT>>(numOutputs);
-		//ArrayList<Integer> numStreamsInOutputs = new ArrayList<Integer>(numOutputs);
-		for (int i = 0; i < numOutputs; i++) {
-			final ShipStrategyType strategy = config.getOutputShipStrategy(i);
-			final TypeComparatorFactory<OT> compFactory = config.getOutputComparator(i, this.userCodeClassLoader);
-			final DataDistribution dataDist = config.getOutputDataDistribution(i, this.userCodeClassLoader);
-			if (compFactory == null) {
-				channelSelectors.add(i, new TezOutputEmitter<OT>(strategy));
-			} else if (dataDist == null){
-				final TypeComparator<OT> comparator = compFactory.createComparator();
-				channelSelectors.add(i, new TezOutputEmitter<OT>(strategy, comparator));
-			} else {
-				final TypeComparator<OT> comparator = compFactory.createComparator();
-				channelSelectors.add(i,new TezOutputEmitter<OT>(strategy, comparator, dataDist));
-			}
-		}
-		this.output = new TezOutputCollector<OT>(writers, channelSelectors, outSerializer, numberOfSubTasksInOutputs);
-	}
-
-
-
-	// --------------------------------------------------------------------
-	// TaskContext interface
-	// --------------------------------------------------------------------
-
-	@Override
-	public TaskConfig getTaskConfig() {
-		return (TaskConfig) this.config;
-	}
-
-	@Override
-	public ExecutionConfig getExecutionConfig() {
-		return executionConfig;
-	}
-
-	@Override
-	public ClassLoader getUserCodeClassLoader() {
-		return this.userCodeClassLoader;
-	}
-
-	@Override
-	public MemoryManager getMemoryManager() {
-		return runtimeEnvironment.getMemoryManager();
-	}
-
-	@Override
-	public IOManager getIOManager() {
-		return runtimeEnvironment.getIOManager();
-	}
-
-	@Override
-	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TaskManagerRuntimeInfo("localhost", new Configuration());
-	}
-
-	@Override
-	public <X> MutableObjectIterator<X> getInput(int index) {
-		if (index < 0 || index > this.driver.getNumberOfInputs()) {
-			throw new IndexOutOfBoundsException();
-		}
-		// check for lazy assignment from input strategies
-		if (this.inputs[index] != null) {
-			@SuppressWarnings("unchecked")
-			MutableObjectIterator<X> in = (MutableObjectIterator<X>) this.inputs[index];
-			return in;
-		} else {
-			final MutableObjectIterator<X> in;
-			try {
-				if (this.localStrategies[index] != null) {
-					@SuppressWarnings("unchecked")
-					MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.localStrategies[index].getIterator();
-					in = iter;
-				} else {
-					throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy.");
-				}
-				this.inputs[index] = in;
-				return in;
-			} catch (InterruptedException iex) {
-				throw new RuntimeException("Interrupted while waiting for input " + index + " to become available.");
-			} catch (IOException ioex) {
-				throw new RuntimeException("An I/O Exception occurred whily obaining input " + index + ".");
-			}
-		}
-	}
-
-	@Override
-	public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
-		if (index < 0 || index >= this.driver.getNumberOfInputs()) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		@SuppressWarnings("unchecked")
-		final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.inputSerializers[index];
-		return serializerFactory;
-	}
-
-	@Override
-	public <X> TypeComparator<X> getDriverComparator(int index) {
-		if (this.inputComparators == null) {
-			throw new IllegalStateException("Comparators have not been created!");
-		}
-		else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		@SuppressWarnings("unchecked")
-		final TypeComparator<X> comparator = (TypeComparator<X>) this.inputComparators[index];
-		return comparator;
-	}
-
-
-
-	@Override
-	public S getStub() {
-		return this.stub;
-	}
-
-	@Override
-	public Collector<OT> getOutputCollector() {
-		return this.output;
-	}
-
-	@Override
-	public AbstractInvokable getOwningNepheleTask() {
-		return this.invokable;
-	}
-
-	@Override
-	public String formatLogString(String message) {
-		return null;
-	}
-
-
-	// --------------------------------------------------------------------
-	// Adapted from BatchTask
-	// --------------------------------------------------------------------
-
-	private void initInputLocalStrategy(int inputNum) throws Exception {
-		// check if there is already a strategy
-		if (this.localStrategies[inputNum] != null) {
-			throw new IllegalStateException();
-		}
-
-		// now set up the local strategy
-		final LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum);
-		if (localStrategy != null) {
-			switch (localStrategy) {
-				case NONE:
-					// the input is as it is
-					this.inputs[inputNum] = this.inputIterators[inputNum];
-					break;
-				case SORT:
-					@SuppressWarnings({ "rawtypes", "unchecked" })
-					UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
-							this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
-							this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
-							this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled());
-					// set the input to null such that it will be lazily fetched from the input strategy
-					this.inputs[inputNum] = null;
-					this.localStrategies[inputNum] = sorter;
-					break;
-				case COMBININGSORT:
-					// sanity check this special case!
-					// this still breaks a bit of the abstraction!
-					// we should have nested configurations for the local strategies to solve that
-					if (inputNum != 0) {
-						throw new IllegalStateException("Performing combining sort outside a (group)reduce task!");
-					}
-
-					// instantiate ourselves a combiner. we should not use the stub, because the sort and the
-					// subsequent (group)reduce would otherwise share it multi-threaded
-					final Class<S> userCodeFunctionType = this.driver.getStubType();
-					if (userCodeFunctionType == null) {
-						throw new IllegalStateException("Performing combining sort outside a reduce task!");
-					}
-					final S localStub;
-					try {
-						localStub = initStub(userCodeFunctionType);
-					} catch (Exception e) {
-						throw new RuntimeException("Initializing the user code and the configuration failed" +
-								(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
-					}
-
-					if (!(localStub instanceof GroupCombineFunction)) {
-						throw new IllegalStateException("Performing combining sort outside a reduce task!");
-					}
-
-					@SuppressWarnings({ "rawtypes", "unchecked" })
-					CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
-							(GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
-							this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
-							this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
-							this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled());
-					cSorter.setUdfConfiguration(this.config.getStubParameters());
-
-					// set the input to null such that it will be lazily fetched from the input strategy
-					this.inputs[inputNum] = null;
-					this.localStrategies[inputNum] = cSorter;
-					break;
-				default:
-					throw new Exception("Unrecognized local strategy provided: " + localStrategy.name());
-			}
-		} else {
-			// no local strategy in the config
-			this.inputs[inputNum] = this.inputIterators[inputNum];
-		}
-	}
-
-	private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception {
-		TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, this.userCodeClassLoader);
-		if (compFact == null) {
-			throw new Exception("Missing comparator factory for local strategy on input " + inputNum);
-		}
-		return compFact.createComparator();
-	}
-
-	protected S initStub(Class<? super S> stubSuperClass) throws Exception {
-		try {
-			S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader);
-			// check if the class is a subclass, if the check is required
-			if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
-				throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
-						stubSuperClass.getName() + "' as is required.");
-			}
-			FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext);
-			return stub;
-		}
-		catch (ClassCastException ccex) {
-			throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex);
-		}
-	}
-
-	/**
-	 * Creates all the serializers and comparators.
-	 */
-	protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
-		this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
-		this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
-		//this.inputComparators = this.driver.requiresComparatorOnInput() ? new TypeComparator[numInputs] : null;
-		this.inputIterators = new MutableObjectIterator[numInputs];
-
-		for (int i = 0; i < numInputs; i++) {
-			//  ---------------- create the serializer first ---------------------
-			final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, this.userCodeClassLoader);
-			this.inputSerializers[i] = serializerFactory;
-			// this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
-		}
-		//  ---------------- create the driver's comparators ---------------------
-		for (int i = 0; i < numComparators; i++) {
-			if (this.inputComparators != null) {
-				final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, this.userCodeClassLoader);
-				this.inputComparators[i] = comparatorFactory.createComparator();
-			}
-		}
-	}
-
-	protected void initialize() throws Exception {
-		// create the operator
-		try {
-			this.driver.setup(this);
-		}
-		catch (Throwable t) {
-			throw new Exception("The driver setup for '" + //TODO put taks name here
-					"' , caused an error: " + t.getMessage(), t);
-		}
-
-		//this.runtimeUdfContext = createRuntimeContext();
-
-		// instantiate the UDF
-		try {
-			final Class<? super S> userCodeFunctionType = this.driver.getStubType();
-			// if the class is null, the driver has no user code
-			if (userCodeFunctionType != null) {
-				this.stub = initStub(userCodeFunctionType);
-			}
-		} catch (Exception e) {
-			throw new RuntimeException("Initializing the UDF" +
-					e.getMessage() == null ? "." : ": " + e.getMessage(), e);
-		}
-	}
-
-	/*
-	public RuntimeUDFContext createRuntimeContext() {
-		return new RuntimeUDFContext(this.taskName, this.numberOfSubtasks, this.indexInSubtaskGroup, null);
-	}
-	*/
-
-	protected void closeLocalStrategies() {
-		if (this.localStrategies != null) {
-			for (int i = 0; i < this.localStrategies.length; i++) {
-				if (this.localStrategies[i] != null) {
-					try {
-						this.localStrategies[i].close();
-					} catch (Throwable t) {
-						LOG.error("Error closing local strategy for input " + i, t);
-					}
-				}
-			}
-		}
-	}
-
-	protected void run() throws Exception {
-		// ---------------------------- Now, the actual processing starts ------------------------
-		// check for asynchronous canceling
-
-		boolean stubOpen = false;
-
-		try {
-			// run the data preparation
-			try {
-				this.driver.prepare();
-			}
-			catch (Throwable t) {
-				// if the preparation caused an error, clean up
-				// errors during clean-up are swallowed, because we have already a root exception
-				throw new Exception("The data preparation for task '" + this.taskName +
-						"' , caused an error: " + t.getMessage(), t);
-			}
-
-			// open stub implementation
-			if (this.stub != null) {
-				try {
-					Configuration stubConfig = this.config.getStubParameters();
-					FunctionUtils.openFunction(this.stub, stubConfig);
-					stubOpen = true;
-				}
-				catch (Throwable t) {
-					throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
-				}
-			}
-
-			// run the user code
-			this.driver.run();
-
-			// close. We close here such that a regular close throwing an exception marks a task as failed.
-			if (this.stub != null) {
-				FunctionUtils.closeFunction(this.stub);
-				stubOpen = false;
-			}
-
-			this.output.close();
-
-		}
-		catch (Exception ex) {
-			// close the input, but do not report any exceptions, since we already have another root cause
-			ex.printStackTrace();
-			throw new RuntimeException("Exception in TaskContext: " + ex.getMessage() + " "+  ex.getStackTrace());
-		}
-		finally {
-			this.driver.cleanup();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
deleted file mode 100644
index 94a8315..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-public class TezTaskConfig extends TaskConfig {
-
-	public static final String TEZ_TASK_CONFIG = "tez.task.flink.processor.taskconfig";
-
-	private static final String NUMBER_SUBTASKS_IN_OUTPUTS = "tez.num_subtasks_in_output";
-
-	private static final String INPUT_SPLIT_PROVIDER = "tez.input_split_provider";
-
-	private static final String INPUT_POSITIONS = "tez.input_positions";
-
-	private static final String INPUT_FORMAT = "tez.input_format";
-
-	private static final String DATASOURCE_PROCESSOR_NAME = "tez.datasource_processor_name";
-
-	public TezTaskConfig(Configuration config) {
-		super(config);
-	}
-
-
-	public void setDatasourceProcessorName(String name) {
-		if (name != null) {
-			this.config.setString(DATASOURCE_PROCESSOR_NAME, name);
-		}
-	}
-
-	public String getDatasourceProcessorName() {
-		return this.config.getString(DATASOURCE_PROCESSOR_NAME, null);
-	}
-
-	public void setNumberSubtasksInOutput(ArrayList<Integer> numberSubtasksInOutputs) {
-		try {
-			InstantiationUtil.writeObjectToConfig(numberSubtasksInOutputs, this.config, NUMBER_SUBTASKS_IN_OUTPUTS);
-		} catch (IOException e) {
-			throw new RuntimeException("Error while writing the input split provider object to the task configuration.");
-		}
-	}
-
-	public ArrayList<Integer> getNumberSubtasksInOutput() {
-		ArrayList<Integer> numberOfSubTasksInOutputs = null;
-		try {
-			numberOfSubTasksInOutputs = (ArrayList) InstantiationUtil.readObjectFromConfig(this.config, NUMBER_SUBTASKS_IN_OUTPUTS, getClass().getClassLoader());
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while reading the number of subtasks in outputs object from the task configuration.");
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Error while reading the number of subtasks in outpurs object from the task configuration. " +
-					"class not found.");
-		}
-		if (numberOfSubTasksInOutputs == null) {
-			throw new NullPointerException();
-		}
-		return numberOfSubTasksInOutputs;
-
-	}
-
-
-	public void setInputSplitProvider (InputSplitProvider inputSplitProvider) {
-		try {
-			InstantiationUtil.writeObjectToConfig(inputSplitProvider, this.config, INPUT_SPLIT_PROVIDER);
-		} catch (IOException e) {
-			throw new RuntimeException("Error while writing the input split provider object to the task configuration.");
-		}
-	}
-
-	public InputSplitProvider getInputSplitProvider () {
-		InputSplitProvider inputSplitProvider = null;
-		try {
-			inputSplitProvider = (InputSplitProvider) InstantiationUtil.readObjectFromConfig(this.config, INPUT_SPLIT_PROVIDER, getClass().getClassLoader());
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while reading the input split provider object from the task configuration.");
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Error while reading the input split provider object from the task configuration. " +
-					"ChannelSelector class not found.");
-		}
-		if (inputSplitProvider == null) {
-			throw new NullPointerException();
-		}
-		return inputSplitProvider;
-	}
-
-
-	public void setInputPositions(HashMap<String,ArrayList<Integer>> inputPositions) {
-		try {
-			InstantiationUtil.writeObjectToConfig(inputPositions, this.config, INPUT_POSITIONS);
-		} catch (IOException e) {
-			throw new RuntimeException("Error while writing the input positions object to the task configuration.");
-		}
-	}
-
-	public HashMap<String,ArrayList<Integer>> getInputPositions () {
-		HashMap<String,ArrayList<Integer>> inputPositions = null;
-		try {
-			inputPositions = (HashMap) InstantiationUtil.readObjectFromConfig(this.config, INPUT_POSITIONS, getClass().getClassLoader());
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while reading the input positions object from the task configuration.");
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Error while reading the input positions object from the task configuration. " +
-					"ChannelSelector class not found.");
-		}
-		if (inputPositions == null) {
-			throw new NullPointerException();
-		}
-		return inputPositions;
-	}
-
-	public void setInputFormat (InputFormat inputFormat) {
-		try {
-			InstantiationUtil.writeObjectToConfig(inputFormat, this.config, INPUT_FORMAT);
-		} catch (IOException e) {
-			throw new RuntimeException("Error while writing the input format object to the task configuration.");
-		}
-	}
-
-	public InputFormat getInputFormat () {
-		InputFormat inputFormat = null;
-		try {
-			inputFormat = (InputFormat) InstantiationUtil.readObjectFromConfig(this.config, INPUT_FORMAT, getClass().getClassLoader());
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while reading the input split provider object from the task configuration.");
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Error while reading the input split provider object from the task configuration. " +
-					"ChannelSelector class not found.");
-		}
-		if (inputFormat == null) {
-			throw new NullPointerException();
-		}
-		return inputFormat;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
deleted file mode 100644
index 7ceeac8..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class UnionProcessor extends AbstractLogicalIOProcessor {
-
-	private TezTaskConfig config;
-	protected Map<String, LogicalInput> inputs;
-	protected Map<String, LogicalOutput> outputs;
-	private List<KeyValueReader> readers;
-	private List<KeyValueWriter> writers;
-	private int numInputs;
-	private int numOutputs;
-
-	public UnionProcessor(ProcessorContext context) {
-		super(context);
-	}
-
-	@Override
-	public void initialize() throws Exception {
-		UserPayload payload = getContext().getUserPayload();
-		Configuration conf = TezUtils.createConfFromUserPayload(payload);
-
-		this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
-		config.setTaskName(getContext().getTaskVertexName());
-	}
-
-	@Override
-	public void handleEvents(List<Event> processorEvents) {
-
-	}
-
-	@Override
-	public void close() throws Exception {
-
-	}
-
-	@Override
-	public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
-		this.inputs = inputs;
-		this.outputs = outputs;
-		this.numInputs = inputs.size();
-		this.numOutputs = outputs.size();
-
-		this.readers = new ArrayList<KeyValueReader>(numInputs);
-		if (this.inputs != null) {
-			for (LogicalInput input: this.inputs.values()) {
-				input.start();
-				readers.add((KeyValueReader) input.getReader());
-			}
-		}
-
-		this.writers = new ArrayList<KeyValueWriter>(numOutputs);
-		if (this.outputs != null) {
-			for (LogicalOutput output : this.outputs.values()) {
-				output.start();
-				writers.add((KeyValueWriter) output.getWriter());
-			}
-		}
-
-		Preconditions.checkArgument(writers.size() == 1);
-		KeyValueWriter writer = writers.get(0);
-
-		for (KeyValueReader reader: this.readers) {
-			while (reader.next()) {
-				Object key = reader.getCurrentKey();
-				Object value = reader.getCurrentValue();
-				writer.write(key, value);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
deleted file mode 100644
index ef59fd0..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.input;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-public class FlinkInput extends AbstractLogicalInput {
-
-	private static final Log LOG = LogFactory.getLog(FlinkInput.class);
-	
-	private InputSplit split;
-	private boolean splitIsCreated;
-	private final ReentrantLock rrLock = new ReentrantLock();
-	private final Condition rrInited = rrLock.newCondition();
-
-	public FlinkInput(InputContext inputContext, int numPhysicalInputs) {
-		super(inputContext, numPhysicalInputs);
-		getContext().requestInitialMemory(0l, null); // mandatory call
-		split = null;
-	}
-
-	@Override
-	public void handleEvents(List<Event> inputEvents) throws Exception {
-		
-		LOG.info("Received " + inputEvents.size() + " events (should be = 1)");
-		
-		Event event = inputEvents.iterator().next();
-		
-		Preconditions.checkArgument(event instanceof InputDataInformationEvent,
-				getClass().getSimpleName()
-						+ " can only handle a single event of type: "
-						+ InputDataInformationEvent.class.getSimpleName());
-
-		initSplitFromEvent ((InputDataInformationEvent)event);
-	}
-
-	private void initSplitFromEvent (InputDataInformationEvent e) throws Exception {
-		rrLock.lock();
-
-		try {
-			ByteString byteString = ByteString.copyFrom(e.getUserPayload());
-			this.split =  (InputSplit) InstantiationUtil.deserializeObject(byteString.toByteArray(), getClass().getClassLoader());
-			this.splitIsCreated = true;
-			
-			LOG.info ("Initializing input split " + split.getSplitNumber() + ": " + split.toString() + " from event (" + e.getSourceIndex() + "," + e.getTargetIndex() + "): " + e.toString());
-			
-			rrInited.signal();
-		}
-		catch (Exception ex) {
-			throw new IOException(
-					"Interrupted waiting for InputSplit initialization");
-		}
-		finally {
-			rrLock.unlock();
-		}
-	}
-
-	@Override
-	public List<Event> close() throws Exception {
-		return null;
-	}
-
-	@Override
-	public void start() throws Exception {
-	}
-
-	@Override
-	public Reader getReader() throws Exception {
-		throw new RuntimeException("FlinkInput does not contain a Reader. Should use getSplit instead");
-	}
-
-	@Override
-	public List<Event> initialize() throws Exception {
-		return null;
-	}
-
-	public InputSplit getSplit () throws Exception {
-
-		rrLock.lock();
-		try {
-			if (!splitIsCreated) {
-				checkAndAwaitSplitInitialization();
-			}
-		}
-		finally {
-			rrLock.unlock();
-		}
-		if (split == null) {
-			LOG.info("Input split has not been created. This should not happen");
-			throw new RuntimeException("Input split has not been created. This should not happen");
-		}
-		return split;
-	}
-
-	void checkAndAwaitSplitInitialization() throws IOException {
-		assert rrLock.getHoldCount() == 1;
-		rrLock.lock();
-		try {
-			rrInited.await();
-		} catch (Exception e) {
-			throw new IOException(
-					"Interrupted waiting for InputSplit initialization");
-		} finally {
-			rrLock.unlock();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
deleted file mode 100644
index db1261c..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.input;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputInitializer;
-import org.apache.tez.runtime.api.InputInitializerContext;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-public class FlinkInputSplitGenerator extends InputInitializer {
-
-	private static final Log LOG = LogFactory.getLog(FlinkInputSplitGenerator.class);
-
-	InputFormat format;
-
-	public FlinkInputSplitGenerator(InputInitializerContext initializerContext) {
-		super(initializerContext);
-	}
-
-	@Override
-	public List<Event> initialize() throws Exception {
-
-		Configuration tezConf = TezUtils.createConfFromUserPayload(this.getContext().getUserPayload());
-
-		TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(tezConf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
-
-		this.format = taskConfig.getInputFormat();
-
-		int numTasks = this.getContext().getNumTasks();
-
-		LOG.info ("Creating splits for " + numTasks + " tasks for input format " + format);
-		
-		InputSplit[] splits = format.createInputSplits((numTasks > 0) ? numTasks : 1 );
-
-		LOG.info ("Created " + splits.length + " input splits" + " tasks for input format " + format);
-		
-		//LOG.info ("Created + " + splits.length + " input splits for input format " + format);
-
-		LOG.info ("Sending input split events");
-		LinkedList<Event> events = new LinkedList<Event>();
-		for (int i = 0; i < splits.length; i++) {
-			byte [] bytes = InstantiationUtil.serializeObject(splits[i]);
-			ByteBuffer buf = ByteBuffer.wrap(bytes);
-			InputDataInformationEvent event = InputDataInformationEvent.createWithSerializedPayload(i % numTasks, buf);
-			event.setTargetIndex(i % numTasks);
-			events.add(event);
-			LOG.info ("Added event of index " + i + ": " + event);
-		}
-		return events;
-	}
-
-	@Override
-	public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
-
-	}
-
-	@Override
-	public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
-		//super.onVertexStateUpdated(stateUpdate);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
deleted file mode 100644
index 722f0a1..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.input;
-
-
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-
-import java.io.IOException;
-
-public class TezReaderIterator<T> implements MutableObjectIterator<T>{
-
-	private KeyValueReader kvReader;
-
-	public TezReaderIterator(KeyValueReader kvReader) {
-		this.kvReader = kvReader;
-	}
-
-	@Override
-	public T next(T reuse) throws IOException {
-		if (kvReader.next()) {
-			Object key = kvReader.getCurrentKey();
-			Object value = kvReader.getCurrentValue();
-			if (!(key instanceof IntWritable)) {
-				throw new IllegalStateException("Wrong key type");
-			}
-			reuse = (T) value;
-			return reuse;
-		}
-		else {
-			return null;
-		}
-	}
-
-	@Override
-	public T next() throws IOException {
-		if (kvReader.next()) {
-			Object key = kvReader.getCurrentKey();
-			Object value = kvReader.getCurrentValue();
-			if (!(key instanceof IntWritable)) {
-				throw new IllegalStateException("Wrong key type");
-			}
-			return (T) value;
-		}
-		else {
-			return null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
deleted file mode 100644
index 2358f29..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.runtime.library.api.Partitioner;
-
-public class SimplePartitioner implements Partitioner {
-
-	@Override
-	public int getPartition(Object key, Object value, int numPartitions) {
-		if (!(key instanceof IntWritable)) {
-			throw new IllegalStateException("Partitioning key should be int");
-		}
-		IntWritable channel = (IntWritable) key;
-		return channel.get();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
deleted file mode 100644
index 7e5cd55..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-import java.io.Serializable;
-
-public interface TezChannelSelector<T> extends Serializable {
-
-	/**
-	 * Called to determine to which attached {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given record shall be forwarded.
-	 *
-	 * @param record
-	 *        the record to the determine the output channels for
-	 * @param numberOfOutputChannels
-	 *        the total number of output channels which are attached to respective output gate
-	 * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through
-	 *         which the record shall be forwarded
-	 */
-	int[] selectChannels(T record, int numberOfOutputChannels);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
deleted file mode 100644
index b68e6c8..0000000
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.io.IOException;
-import java.util.List;
-
-public class TezOutputCollector<T> implements Collector<T> {
-
-	private List<KeyValueWriter> writers;
-
-	private List<TezChannelSelector<T>> outputEmitters;
-
-	private List<Integer> numberOfStreamsInOutputs;
-
-	private int numOutputs;
-
-	private TypeSerializer<T> serializer;
-
-	public TezOutputCollector(List<KeyValueWriter> writers, List<TezChannelSelector<T>> outputEmitters, TypeSerializer<T> serializer, List<Integer> numberOfStreamsInOutputs) {
-		this.writers = writers;
-		this.outputEmitters = outputEmitters;
-		this.numberOfStreamsInOutputs = numberOfStreamsInOutputs;
-		this.serializer = serializer;
-		this.numOutputs = writers.size();
-	}
-
-	@Override
-	public void collect(T record) {
-		for (int i = 0; i < numOutputs; i++) {
-			KeyValueWriter writer = writers.get(i);
-			TezChannelSelector<T> outputEmitter = outputEmitters.get(i);
-			int numberOfStreamsInOutput = numberOfStreamsInOutputs.get(i);
-			try {
-				for (int channel : outputEmitter.selectChannels(record, numberOfStreamsInOutput)) {
-					IntWritable key = new IntWritable(channel);
-					writer.write(key, record);
-				}
-			}
-			catch (IOException e) {
-				throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
-			}
-		}
-	}
-
-	@Override
-	public void close() {
-
-	}
-}


Mime
View raw message