flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [06/12] flink git commit: [FLINK-3235] Remove Flink on Tez code
Date Fri, 15 Jan 2016 10:53:36 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
deleted file mode 100644
index 89e4642..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.Driver;
-import org.apache.flink.runtime.operators.TaskContext;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
-import org.apache.flink.runtime.operators.util.CloseableInputProvider;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.tez.runtime.input.TezReaderIterator;
-import org.apache.flink.tez.runtime.output.TezChannelSelector;
-import org.apache.flink.tez.runtime.output.TezOutputEmitter;
-import org.apache.flink.tez.runtime.output.TezOutputCollector;
-import org.apache.flink.tez.util.DummyInvokable;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.MutableObjectIterator;
-
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.io.IOException;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-
-public class TezTask<S extends Function,OT>  implements TaskContext<S, OT> {
-
-	protected static final Log LOG = LogFactory.getLog(TezTask.class);
-
-	DummyInvokable invokable = new DummyInvokable();
-
-	/**
-	 * The driver that invokes the user code (the stub implementation). The central driver in this task
-	 * (further drivers may be chained behind this driver).
-	 */
-	protected volatile Driver<S, OT> driver;
-
-	/**
-	 * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf.
-	 */
-	protected S stub;
-
-	/**
-	 * The udf's runtime context.
-	 */
-	protected RuntimeUDFContext runtimeUdfContext;
-
-	/**
-	 * The collector that forwards the user code's results. May forward to a channel or to chained drivers within
-	 * this task.
-	 */
-	protected Collector<OT> output;
-
-	/**
-	 * The inputs reader, wrapped in an iterator. Prior to the local strategies, etc...
-	 */
-	protected MutableObjectIterator<?>[] inputIterators;
-
-	/**
-	 * The local strategies that are applied on the inputs.
-	 */
-	protected volatile CloseableInputProvider<?>[] localStrategies;
-
-	/**
-	 * The inputs to the operator. Return the readers' data after the application of the local strategy
-	 * and the temp-table barrier.
-	 */
-	protected MutableObjectIterator<?>[] inputs;
-
-	/**
-	 * The serializers for the input data type.
-	 */
-	protected TypeSerializerFactory<?>[] inputSerializers;
-
-	/**
-	 * The comparators for the central driver.
-	 */
-	protected TypeComparator<?>[] inputComparators;
-
-	/**
-	 * The task configuration with the setup parameters.
-	 */
-	protected TezTaskConfig config;
-
-	/**
-	 * The class loader used to instantiate user code and user data types.
-	 */
-	protected ClassLoader userCodeClassLoader = ClassLoader.getSystemClassLoader();
-
-	/**
-	 * For now, create a default ExecutionConfig 
-	 */
-	protected ExecutionConfig executionConfig;
-
-	/*
-	 * Tez-specific variables given by the Processor
-	 */
-	protected TypeSerializer<OT> outSerializer;
-
-	protected List<Integer> numberOfSubTasksInOutputs;
-
-	protected String taskName;
-
-	protected int numberOfSubtasks;
-
-	protected int indexInSubtaskGroup;
-
-	TezRuntimeEnvironment runtimeEnvironment;
-
-	public TezTask(TezTaskConfig config, RuntimeUDFContext runtimeUdfContext, long availableMemory) {
-		this.config = config;
-		final Class<? extends Driver<S, OT>> driverClass = this.config.getDriver();
-		this.driver = InstantiationUtil.instantiate(driverClass, Driver.class);
-		
-		LOG.info("ClassLoader URLs: " + Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs()));
-		
-		this.stub = this.config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(Function.class, this.userCodeClassLoader); //TODO get superclass properly
-		this.runtimeUdfContext = runtimeUdfContext;
-		this.outSerializer = (TypeSerializer<OT>) this.config.getOutputSerializer(getClass().getClassLoader()).getSerializer();
-		this.numberOfSubTasksInOutputs = this.config.getNumberSubtasksInOutput();
-		this.taskName = this.config.getTaskName();
-		this.numberOfSubtasks = this.runtimeUdfContext.getNumberOfParallelSubtasks();
-		this.indexInSubtaskGroup = this.runtimeUdfContext.getIndexOfThisSubtask();
-		this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 * availableMemory));
-		this.executionConfig = runtimeUdfContext.getExecutionConfig();
-		this.invokable.setExecutionConfig(this.executionConfig);
-	}
-
-
-	//-------------------------------------------------------------
-	// Interface to FlinkProcessor
-	//-------------------------------------------------------------
-
-	public void invoke(List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception {
-
-		// whatever happens in this scope, make sure that the local strategies are cleaned up!
-		// note that the initialization of the local strategies is in the try-finally block as well,
-		// so that the thread that creates them catches its own errors that may happen in that process.
-		// this is especially important, since there may be asynchronous closes (such as through canceling).
-		try {
-			// initialize the inputs and outputs
-			initInputsOutputs(readers, writers);
-
-			// pre main-function initialization
-			initialize();
-
-			// the work goes here
-			run();
-		}
-		finally {
-			// clean up in any case!
-			closeLocalStrategies();
-		}
-	}
-
-
-	/*
-	 * Initialize inputs, input serializers, input comparators, and collector
-	 * Assumes that the config and userCodeClassLoader has been set
-	 */
-	private void initInputsOutputs (List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception {
-
-		int numInputs = readers.size();
-		Preconditions.checkArgument(numInputs == driver.getNumberOfInputs());
-
-		// Prior to local strategies
-		this.inputIterators = new MutableObjectIterator[numInputs];
-		//local strategies
-		this.localStrategies = new CloseableInputProvider[numInputs];
-		// After local strategies
-		this.inputs = new MutableObjectIterator[numInputs];
-
-		int numComparators = driver.getNumberOfDriverComparators();
-		initInputsSerializersAndComparators(numInputs, numComparators);
-
-		int index = 0;
-		for (KeyValueReader reader : readers) {
-			this.inputIterators[index] = new TezReaderIterator<Object>(reader);
-			initInputLocalStrategy(index);
-			index++;
-		}
-
-		int numOutputs = writers.size();
-		ArrayList<TezChannelSelector<OT>> channelSelectors = new ArrayList<TezChannelSelector<OT>>(numOutputs);
-		//ArrayList<Integer> numStreamsInOutputs = new ArrayList<Integer>(numOutputs);
-		for (int i = 0; i < numOutputs; i++) {
-			final ShipStrategyType strategy = config.getOutputShipStrategy(i);
-			final TypeComparatorFactory<OT> compFactory = config.getOutputComparator(i, this.userCodeClassLoader);
-			final DataDistribution dataDist = config.getOutputDataDistribution(i, this.userCodeClassLoader);
-			if (compFactory == null) {
-				channelSelectors.add(i, new TezOutputEmitter<OT>(strategy));
-			} else if (dataDist == null){
-				final TypeComparator<OT> comparator = compFactory.createComparator();
-				channelSelectors.add(i, new TezOutputEmitter<OT>(strategy, comparator));
-			} else {
-				final TypeComparator<OT> comparator = compFactory.createComparator();
-				channelSelectors.add(i,new TezOutputEmitter<OT>(strategy, comparator, dataDist));
-			}
-		}
-		this.output = new TezOutputCollector<OT>(writers, channelSelectors, outSerializer, numberOfSubTasksInOutputs);
-	}
-
-
-
-	// --------------------------------------------------------------------
-	// TaskContext interface
-	// --------------------------------------------------------------------
-
-	@Override
-	public TaskConfig getTaskConfig() {
-		return (TaskConfig) this.config;
-	}
-
-	@Override
-	public ExecutionConfig getExecutionConfig() {
-		return executionConfig;
-	}
-
-	@Override
-	public ClassLoader getUserCodeClassLoader() {
-		return this.userCodeClassLoader;
-	}
-
-	@Override
-	public MemoryManager getMemoryManager() {
-		return runtimeEnvironment.getMemoryManager();
-	}
-
-	@Override
-	public IOManager getIOManager() {
-		return runtimeEnvironment.getIOManager();
-	}
-
-	@Override
-	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TaskManagerRuntimeInfo("localhost", new Configuration());
-	}
-
-	@Override
-	public <X> MutableObjectIterator<X> getInput(int index) {
-		if (index < 0 || index > this.driver.getNumberOfInputs()) {
-			throw new IndexOutOfBoundsException();
-		}
-		// check for lazy assignment from input strategies
-		if (this.inputs[index] != null) {
-			@SuppressWarnings("unchecked")
-			MutableObjectIterator<X> in = (MutableObjectIterator<X>) this.inputs[index];
-			return in;
-		} else {
-			final MutableObjectIterator<X> in;
-			try {
-				if (this.localStrategies[index] != null) {
-					@SuppressWarnings("unchecked")
-					MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.localStrategies[index].getIterator();
-					in = iter;
-				} else {
-					throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy.");
-				}
-				this.inputs[index] = in;
-				return in;
-			} catch (InterruptedException iex) {
-				throw new RuntimeException("Interrupted while waiting for input " + index + " to become available.");
-			} catch (IOException ioex) {
-				throw new RuntimeException("An I/O Exception occurred whily obaining input " + index + ".");
-			}
-		}
-	}
-
-	@Override
-	public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
-		if (index < 0 || index >= this.driver.getNumberOfInputs()) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		@SuppressWarnings("unchecked")
-		final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.inputSerializers[index];
-		return serializerFactory;
-	}
-
-	@Override
-	public <X> TypeComparator<X> getDriverComparator(int index) {
-		if (this.inputComparators == null) {
-			throw new IllegalStateException("Comparators have not been created!");
-		}
-		else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) {
-			throw new IndexOutOfBoundsException();
-		}
-
-		@SuppressWarnings("unchecked")
-		final TypeComparator<X> comparator = (TypeComparator<X>) this.inputComparators[index];
-		return comparator;
-	}
-
-
-
-	@Override
-	public S getStub() {
-		return this.stub;
-	}
-
-	@Override
-	public Collector<OT> getOutputCollector() {
-		return this.output;
-	}
-
-	@Override
-	public AbstractInvokable getOwningNepheleTask() {
-		return this.invokable;
-	}
-
-	@Override
-	public String formatLogString(String message) {
-		return null;
-	}
-
-
-	// --------------------------------------------------------------------
-	// Adapted from BatchTask
-	// --------------------------------------------------------------------
-
-	private void initInputLocalStrategy(int inputNum) throws Exception {
-		// check if there is already a strategy
-		if (this.localStrategies[inputNum] != null) {
-			throw new IllegalStateException();
-		}
-
-		// now set up the local strategy
-		final LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum);
-		if (localStrategy != null) {
-			switch (localStrategy) {
-				case NONE:
-					// the input is as it is
-					this.inputs[inputNum] = this.inputIterators[inputNum];
-					break;
-				case SORT:
-					@SuppressWarnings({ "rawtypes", "unchecked" })
-					UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
-							this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
-							this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
-							this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled());
-					// set the input to null such that it will be lazily fetched from the input strategy
-					this.inputs[inputNum] = null;
-					this.localStrategies[inputNum] = sorter;
-					break;
-				case COMBININGSORT:
-					// sanity check this special case!
-					// this still breaks a bit of the abstraction!
-					// we should have nested configurations for the local strategies to solve that
-					if (inputNum != 0) {
-						throw new IllegalStateException("Performing combining sort outside a (group)reduce task!");
-					}
-
-					// instantiate ourselves a combiner. we should not use the stub, because the sort and the
-					// subsequent (group)reduce would otherwise share it multi-threaded
-					final Class<S> userCodeFunctionType = this.driver.getStubType();
-					if (userCodeFunctionType == null) {
-						throw new IllegalStateException("Performing combining sort outside a reduce task!");
-					}
-					final S localStub;
-					try {
-						localStub = initStub(userCodeFunctionType);
-					} catch (Exception e) {
-						throw new RuntimeException("Initializing the user code and the configuration failed" +
-								(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
-					}
-
-					if (!(localStub instanceof GroupCombineFunction)) {
-						throw new IllegalStateException("Performing combining sort outside a reduce task!");
-					}
-
-					@SuppressWarnings({ "rawtypes", "unchecked" })
-					CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
-							(GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
-							this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
-							this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
-							this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled());
-					cSorter.setUdfConfiguration(this.config.getStubParameters());
-
-					// set the input to null such that it will be lazily fetched from the input strategy
-					this.inputs[inputNum] = null;
-					this.localStrategies[inputNum] = cSorter;
-					break;
-				default:
-					throw new Exception("Unrecognized local strategy provided: " + localStrategy.name());
-			}
-		} else {
-			// no local strategy in the config
-			this.inputs[inputNum] = this.inputIterators[inputNum];
-		}
-	}
-
-	private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception {
-		TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, this.userCodeClassLoader);
-		if (compFact == null) {
-			throw new Exception("Missing comparator factory for local strategy on input " + inputNum);
-		}
-		return compFact.createComparator();
-	}
-
-	protected S initStub(Class<? super S> stubSuperClass) throws Exception {
-		try {
-			S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader);
-			// check if the class is a subclass, if the check is required
-			if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
-				throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
-						stubSuperClass.getName() + "' as is required.");
-			}
-			FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext);
-			return stub;
-		}
-		catch (ClassCastException ccex) {
-			throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex);
-		}
-	}
-
-	/**
-	 * Creates all the serializers and comparators.
-	 */
-	protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
-		this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
-		this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
-		//this.inputComparators = this.driver.requiresComparatorOnInput() ? new TypeComparator[numInputs] : null;
-		this.inputIterators = new MutableObjectIterator[numInputs];
-
-		for (int i = 0; i < numInputs; i++) {
-			//  ---------------- create the serializer first ---------------------
-			final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, this.userCodeClassLoader);
-			this.inputSerializers[i] = serializerFactory;
-			// this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
-		}
-		//  ---------------- create the driver's comparators ---------------------
-		for (int i = 0; i < numComparators; i++) {
-			if (this.inputComparators != null) {
-				final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, this.userCodeClassLoader);
-				this.inputComparators[i] = comparatorFactory.createComparator();
-			}
-		}
-	}
-
-	protected void initialize() throws Exception {
-		// create the operator
-		try {
-			this.driver.setup(this);
-		}
-		catch (Throwable t) {
-			throw new Exception("The driver setup for '" + //TODO put taks name here
-					"' , caused an error: " + t.getMessage(), t);
-		}
-
-		//this.runtimeUdfContext = createRuntimeContext();
-
-		// instantiate the UDF
-		try {
-			final Class<? super S> userCodeFunctionType = this.driver.getStubType();
-			// if the class is null, the driver has no user code
-			if (userCodeFunctionType != null) {
-				this.stub = initStub(userCodeFunctionType);
-			}
-		} catch (Exception e) {
-			throw new RuntimeException("Initializing the UDF" +
-					e.getMessage() == null ? "." : ": " + e.getMessage(), e);
-		}
-	}
-
-	/*
-	public RuntimeUDFContext createRuntimeContext() {
-		return new RuntimeUDFContext(this.taskName, this.numberOfSubtasks, this.indexInSubtaskGroup, null);
-	}
-	*/
-
-	protected void closeLocalStrategies() {
-		if (this.localStrategies != null) {
-			for (int i = 0; i < this.localStrategies.length; i++) {
-				if (this.localStrategies[i] != null) {
-					try {
-						this.localStrategies[i].close();
-					} catch (Throwable t) {
-						LOG.error("Error closing local strategy for input " + i, t);
-					}
-				}
-			}
-		}
-	}
-
-	protected void run() throws Exception {
-		// ---------------------------- Now, the actual processing starts ------------------------
-		// check for asynchronous canceling
-
-		boolean stubOpen = false;
-
-		try {
-			// run the data preparation
-			try {
-				this.driver.prepare();
-			}
-			catch (Throwable t) {
-				// if the preparation caused an error, clean up
-				// errors during clean-up are swallowed, because we have already a root exception
-				throw new Exception("The data preparation for task '" + this.taskName +
-						"' , caused an error: " + t.getMessage(), t);
-			}
-
-			// open stub implementation
-			if (this.stub != null) {
-				try {
-					Configuration stubConfig = this.config.getStubParameters();
-					FunctionUtils.openFunction(this.stub, stubConfig);
-					stubOpen = true;
-				}
-				catch (Throwable t) {
-					throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
-				}
-			}
-
-			// run the user code
-			this.driver.run();
-
-			// close. We close here such that a regular close throwing an exception marks a task as failed.
-			if (this.stub != null) {
-				FunctionUtils.closeFunction(this.stub);
-				stubOpen = false;
-			}
-
-			this.output.close();
-
-		}
-		catch (Exception ex) {
-			// close the input, but do not report any exceptions, since we already have another root cause
-			ex.printStackTrace();
-			throw new RuntimeException("Exception in TaskContext: " + ex.getMessage() + " "+  ex.getStackTrace());
-		}
-		finally {
-			this.driver.cleanup();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
deleted file mode 100644
index 94a8315..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-public class TezTaskConfig extends TaskConfig {
-
-	public static final String TEZ_TASK_CONFIG = "tez.task.flink.processor.taskconfig";
-
-	private static final String NUMBER_SUBTASKS_IN_OUTPUTS = "tez.num_subtasks_in_output";
-
-	private static final String INPUT_SPLIT_PROVIDER = "tez.input_split_provider";
-
-	private static final String INPUT_POSITIONS = "tez.input_positions";
-
-	private static final String INPUT_FORMAT = "tez.input_format";
-
-	private static final String DATASOURCE_PROCESSOR_NAME = "tez.datasource_processor_name";
-
-	public TezTaskConfig(Configuration config) {
-		super(config);
-	}
-
-
-	public void setDatasourceProcessorName(String name) {
-		if (name != null) {
-			this.config.setString(DATASOURCE_PROCESSOR_NAME, name);
-		}
-	}
-
-	public String getDatasourceProcessorName() {
-		return this.config.getString(DATASOURCE_PROCESSOR_NAME, null);
-	}
-
-	public void setNumberSubtasksInOutput(ArrayList<Integer> numberSubtasksInOutputs) {
-		try {
-			InstantiationUtil.writeObjectToConfig(numberSubtasksInOutputs, this.config, NUMBER_SUBTASKS_IN_OUTPUTS);
-		} catch (IOException e) {
-			throw new RuntimeException("Error while writing the input split provider object to the task configuration.");
-		}
-	}
-
-	public ArrayList<Integer> getNumberSubtasksInOutput() {
-		ArrayList<Integer> numberOfSubTasksInOutputs = null;
-		try {
-			numberOfSubTasksInOutputs = (ArrayList) InstantiationUtil.readObjectFromConfig(this.config, NUMBER_SUBTASKS_IN_OUTPUTS, getClass().getClassLoader());
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while reading the number of subtasks in outputs object from the task configuration.");
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Error while reading the number of subtasks in outpurs object from the task configuration. " +
-					"class not found.");
-		}
-		if (numberOfSubTasksInOutputs == null) {
-			throw new NullPointerException();
-		}
-		return numberOfSubTasksInOutputs;
-
-	}
-
-
-	public void setInputSplitProvider (InputSplitProvider inputSplitProvider) {
-		try {
-			InstantiationUtil.writeObjectToConfig(inputSplitProvider, this.config, INPUT_SPLIT_PROVIDER);
-		} catch (IOException e) {
-			throw new RuntimeException("Error while writing the input split provider object to the task configuration.");
-		}
-	}
-
-	public InputSplitProvider getInputSplitProvider () {
-		InputSplitProvider inputSplitProvider = null;
-		try {
-			inputSplitProvider = (InputSplitProvider) InstantiationUtil.readObjectFromConfig(this.config, INPUT_SPLIT_PROVIDER, getClass().getClassLoader());
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while reading the input split provider object from the task configuration.");
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Error while reading the input split provider object from the task configuration. " +
-					"ChannelSelector class not found.");
-		}
-		if (inputSplitProvider == null) {
-			throw new NullPointerException();
-		}
-		return inputSplitProvider;
-	}
-
-
-	public void setInputPositions(HashMap<String,ArrayList<Integer>> inputPositions) {
-		try {
-			InstantiationUtil.writeObjectToConfig(inputPositions, this.config, INPUT_POSITIONS);
-		} catch (IOException e) {
-			throw new RuntimeException("Error while writing the input positions object to the task configuration.");
-		}
-	}
-
-	public HashMap<String,ArrayList<Integer>> getInputPositions () {
-		HashMap<String,ArrayList<Integer>> inputPositions = null;
-		try {
-			inputPositions = (HashMap) InstantiationUtil.readObjectFromConfig(this.config, INPUT_POSITIONS, getClass().getClassLoader());
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while reading the input positions object from the task configuration.");
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Error while reading the input positions object from the task configuration. " +
-					"ChannelSelector class not found.");
-		}
-		if (inputPositions == null) {
-			throw new NullPointerException();
-		}
-		return inputPositions;
-	}
-
-	public void setInputFormat (InputFormat inputFormat) {
-		try {
-			InstantiationUtil.writeObjectToConfig(inputFormat, this.config, INPUT_FORMAT);
-		} catch (IOException e) {
-			throw new RuntimeException("Error while writing the input format object to the task configuration.");
-		}
-	}
-
-	public InputFormat getInputFormat () {
-		InputFormat inputFormat = null;
-		try {
-			inputFormat = (InputFormat) InstantiationUtil.readObjectFromConfig(this.config, INPUT_FORMAT, getClass().getClassLoader());
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while reading the input split provider object from the task configuration.");
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Error while reading the input split provider object from the task configuration. " +
-					"ChannelSelector class not found.");
-		}
-		if (inputFormat == null) {
-			throw new NullPointerException();
-		}
-		return inputFormat;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
deleted file mode 100644
index 7ceeac8..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime;
-
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class UnionProcessor extends AbstractLogicalIOProcessor {
-
-	private TezTaskConfig config;
-	protected Map<String, LogicalInput> inputs;
-	protected Map<String, LogicalOutput> outputs;
-	private List<KeyValueReader> readers;
-	private List<KeyValueWriter> writers;
-	private int numInputs;
-	private int numOutputs;
-
-	public UnionProcessor(ProcessorContext context) {
-		super(context);
-	}
-
-	@Override
-	public void initialize() throws Exception {
-		UserPayload payload = getContext().getUserPayload();
-		Configuration conf = TezUtils.createConfFromUserPayload(payload);
-
-		this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
-		config.setTaskName(getContext().getTaskVertexName());
-	}
-
-	@Override
-	public void handleEvents(List<Event> processorEvents) {
-
-	}
-
-	@Override
-	public void close() throws Exception {
-
-	}
-
-	@Override
-	public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
-		this.inputs = inputs;
-		this.outputs = outputs;
-		this.numInputs = inputs.size();
-		this.numOutputs = outputs.size();
-
-		this.readers = new ArrayList<KeyValueReader>(numInputs);
-		if (this.inputs != null) {
-			for (LogicalInput input: this.inputs.values()) {
-				input.start();
-				readers.add((KeyValueReader) input.getReader());
-			}
-		}
-
-		this.writers = new ArrayList<KeyValueWriter>(numOutputs);
-		if (this.outputs != null) {
-			for (LogicalOutput output : this.outputs.values()) {
-				output.start();
-				writers.add((KeyValueWriter) output.getWriter());
-			}
-		}
-
-		Preconditions.checkArgument(writers.size() == 1);
-		KeyValueWriter writer = writers.get(0);
-
-		for (KeyValueReader reader: this.readers) {
-			while (reader.next()) {
-				Object key = reader.getCurrentKey();
-				Object value = reader.getCurrentValue();
-				writer.write(key, value);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
deleted file mode 100644
index ef59fd0..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.input;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-public class FlinkInput extends AbstractLogicalInput {
-
-	private static final Log LOG = LogFactory.getLog(FlinkInput.class);
-	
-	private InputSplit split;
-	private boolean splitIsCreated;
-	private final ReentrantLock rrLock = new ReentrantLock();
-	private final Condition rrInited = rrLock.newCondition();
-
-	public FlinkInput(InputContext inputContext, int numPhysicalInputs) {
-		super(inputContext, numPhysicalInputs);
-		getContext().requestInitialMemory(0l, null); // mandatory call
-		split = null;
-	}
-
-	@Override
-	public void handleEvents(List<Event> inputEvents) throws Exception {
-		
-		LOG.info("Received " + inputEvents.size() + " events (should be = 1)");
-		
-		Event event = inputEvents.iterator().next();
-		
-		Preconditions.checkArgument(event instanceof InputDataInformationEvent,
-				getClass().getSimpleName()
-						+ " can only handle a single event of type: "
-						+ InputDataInformationEvent.class.getSimpleName());
-
-		initSplitFromEvent ((InputDataInformationEvent)event);
-	}
-
-	private void initSplitFromEvent (InputDataInformationEvent e) throws Exception {
-		rrLock.lock();
-
-		try {
-			ByteString byteString = ByteString.copyFrom(e.getUserPayload());
-			this.split =  (InputSplit) InstantiationUtil.deserializeObject(byteString.toByteArray(), getClass().getClassLoader());
-			this.splitIsCreated = true;
-			
-			LOG.info ("Initializing input split " + split.getSplitNumber() + ": " + split.toString() + " from event (" + e.getSourceIndex() + "," + e.getTargetIndex() + "): " + e.toString());
-			
-			rrInited.signal();
-		}
-		catch (Exception ex) {
-			throw new IOException(
-					"Interrupted waiting for InputSplit initialization");
-		}
-		finally {
-			rrLock.unlock();
-		}
-	}
-
-	@Override
-	public List<Event> close() throws Exception {
-		return null;
-	}
-
-	@Override
-	public void start() throws Exception {
-	}
-
-	@Override
-	public Reader getReader() throws Exception {
-		throw new RuntimeException("FlinkInput does not contain a Reader. Should use getSplit instead");
-	}
-
-	@Override
-	public List<Event> initialize() throws Exception {
-		return null;
-	}
-
-	public InputSplit getSplit () throws Exception {
-
-		rrLock.lock();
-		try {
-			if (!splitIsCreated) {
-				checkAndAwaitSplitInitialization();
-			}
-		}
-		finally {
-			rrLock.unlock();
-		}
-		if (split == null) {
-			LOG.info("Input split has not been created. This should not happen");
-			throw new RuntimeException("Input split has not been created. This should not happen");
-		}
-		return split;
-	}
-
-	void checkAndAwaitSplitInitialization() throws IOException {
-		assert rrLock.getHoldCount() == 1;
-		rrLock.lock();
-		try {
-			rrInited.await();
-		} catch (Exception e) {
-			throw new IOException(
-					"Interrupted waiting for InputSplit initialization");
-		} finally {
-			rrLock.unlock();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
deleted file mode 100644
index db1261c..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.input;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputInitializer;
-import org.apache.tez.runtime.api.InputInitializerContext;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-public class FlinkInputSplitGenerator extends InputInitializer {
-
-	private static final Log LOG = LogFactory.getLog(FlinkInputSplitGenerator.class);
-
-	InputFormat format;
-
-	public FlinkInputSplitGenerator(InputInitializerContext initializerContext) {
-		super(initializerContext);
-	}
-
-	@Override
-	public List<Event> initialize() throws Exception {
-
-		Configuration tezConf = TezUtils.createConfFromUserPayload(this.getContext().getUserPayload());
-
-		TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(tezConf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
-
-		this.format = taskConfig.getInputFormat();
-
-		int numTasks = this.getContext().getNumTasks();
-
-		LOG.info ("Creating splits for " + numTasks + " tasks for input format " + format);
-		
-		InputSplit[] splits = format.createInputSplits((numTasks > 0) ? numTasks : 1 );
-
-		LOG.info ("Created " + splits.length + " input splits" + " tasks for input format " + format);
-		
-		//LOG.info ("Created + " + splits.length + " input splits for input format " + format);
-
-		LOG.info ("Sending input split events");
-		LinkedList<Event> events = new LinkedList<Event>();
-		for (int i = 0; i < splits.length; i++) {
-			byte [] bytes = InstantiationUtil.serializeObject(splits[i]);
-			ByteBuffer buf = ByteBuffer.wrap(bytes);
-			InputDataInformationEvent event = InputDataInformationEvent.createWithSerializedPayload(i % numTasks, buf);
-			event.setTargetIndex(i % numTasks);
-			events.add(event);
-			LOG.info ("Added event of index " + i + ": " + event);
-		}
-		return events;
-	}
-
-	@Override
-	public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
-
-	}
-
-	@Override
-	public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
-		//super.onVertexStateUpdated(stateUpdate);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
deleted file mode 100644
index 722f0a1..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.input;
-
-
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-
-import java.io.IOException;
-
-public class TezReaderIterator<T> implements MutableObjectIterator<T>{
-
-	private KeyValueReader kvReader;
-
-	public TezReaderIterator(KeyValueReader kvReader) {
-		this.kvReader = kvReader;
-	}
-
-	@Override
-	public T next(T reuse) throws IOException {
-		if (kvReader.next()) {
-			Object key = kvReader.getCurrentKey();
-			Object value = kvReader.getCurrentValue();
-			if (!(key instanceof IntWritable)) {
-				throw new IllegalStateException("Wrong key type");
-			}
-			reuse = (T) value;
-			return reuse;
-		}
-		else {
-			return null;
-		}
-	}
-
-	@Override
-	public T next() throws IOException {
-		if (kvReader.next()) {
-			Object key = kvReader.getCurrentKey();
-			Object value = kvReader.getCurrentValue();
-			if (!(key instanceof IntWritable)) {
-				throw new IllegalStateException("Wrong key type");
-			}
-			return (T) value;
-		}
-		else {
-			return null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
deleted file mode 100644
index 2358f29..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.runtime.library.api.Partitioner;
-
-public class SimplePartitioner implements Partitioner {
-
-	@Override
-	public int getPartition(Object key, Object value, int numPartitions) {
-		if (!(key instanceof IntWritable)) {
-			throw new IllegalStateException("Partitioning key should be int");
-		}
-		IntWritable channel = (IntWritable) key;
-		return channel.get();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
deleted file mode 100644
index 7e5cd55..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-import java.io.Serializable;
-
-public interface TezChannelSelector<T> extends Serializable {
-
-	/**
-	 * Called to determine to which attached {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given record shall be forwarded.
-	 *
-	 * @param record
-	 *        the record to the determine the output channels for
-	 * @param numberOfOutputChannels
-	 *        the total number of output channels which are attached to respective output gate
-	 * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through
-	 *         which the record shall be forwarded
-	 */
-	int[] selectChannels(T record, int numberOfOutputChannels);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
deleted file mode 100644
index b68e6c8..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import java.io.IOException;
-import java.util.List;
-
-public class TezOutputCollector<T> implements Collector<T> {
-
-	private List<KeyValueWriter> writers;
-
-	private List<TezChannelSelector<T>> outputEmitters;
-
-	private List<Integer> numberOfStreamsInOutputs;
-
-	private int numOutputs;
-
-	private TypeSerializer<T> serializer;
-
-	public TezOutputCollector(List<KeyValueWriter> writers, List<TezChannelSelector<T>> outputEmitters, TypeSerializer<T> serializer, List<Integer> numberOfStreamsInOutputs) {
-		this.writers = writers;
-		this.outputEmitters = outputEmitters;
-		this.numberOfStreamsInOutputs = numberOfStreamsInOutputs;
-		this.serializer = serializer;
-		this.numOutputs = writers.size();
-	}
-
-	@Override
-	public void collect(T record) {
-		for (int i = 0; i < numOutputs; i++) {
-			KeyValueWriter writer = writers.get(i);
-			TezChannelSelector<T> outputEmitter = outputEmitters.get(i);
-			int numberOfStreamsInOutput = numberOfStreamsInOutputs.get(i);
-			try {
-				for (int channel : outputEmitter.selectChannels(record, numberOfStreamsInOutput)) {
-					IntWritable key = new IntWritable(channel);
-					writer.write(key, record);
-				}
-			}
-			catch (IOException e) {
-				throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
-			}
-		}
-	}
-
-	@Override
-	public void close() {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
deleted file mode 100644
index 6dcee0b..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.runtime.output;
-
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-public class TezOutputEmitter<T> implements TezChannelSelector<T> {
-
-	private final ShipStrategyType strategy;		// the shipping strategy used by this output emitter
-
-	private int[] channels;						// the reused array defining target channels
-
-	private int nextChannelToSendTo = 0;		// counter to go over channels round robin
-
-	private final TypeComparator<T> comparator;	// the comparator for hashing / sorting
-
-	// ------------------------------------------------------------------------
-	// Constructors
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new channel selector that distributes data round robin.
-	 */
-	public TezOutputEmitter() {
-		this(ShipStrategyType.NONE);
-	}
-
-	/**
-	 * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...).
-	 *
-	 * @param strategy The distribution strategy to be used.
-	 */
-	public TezOutputEmitter(ShipStrategyType strategy) {
-		this(strategy, null);
-	}
-
-	/**
-	 * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...)
-	 * and uses the supplied comparator to hash / compare records for partitioning them deterministically.
-	 *
-	 * @param strategy The distribution strategy to be used.
-	 * @param comparator The comparator used to hash / compare the records.
-	 */
-	public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator) {
-		this(strategy, comparator, null);
-	}
-
-	/**
-	 * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...)
-	 * and uses the supplied comparator to hash / compare records for partitioning them deterministically.
-	 *
-	 * @param strategy The distribution strategy to be used.
-	 * @param comparator The comparator used to hash / compare the records.
-	 * @param distr The distribution pattern used in the case of a range partitioning.
-	 */
-	public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator, DataDistribution distr) {
-		if (strategy == null) {
-			throw new NullPointerException();
-		}
-
-		this.strategy = strategy;
-		this.comparator = comparator;
-
-		switch (strategy) {
-			case FORWARD:
-			case PARTITION_HASH:
-			case PARTITION_RANGE:
-			case PARTITION_RANDOM:
-			case PARTITION_FORCED_REBALANCE:
-			case BROADCAST:
-				break;
-			default:
-				throw new IllegalArgumentException("Invalid shipping strategy for OutputEmitter: " + strategy.name());
-		}
-
-		if ((strategy == ShipStrategyType.PARTITION_RANGE) && distr == null) {
-			throw new NullPointerException("Data distribution must not be null when the ship strategy is range partitioning.");
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	// Channel Selection
-	// ------------------------------------------------------------------------
-
-	@Override
-	public final int[] selectChannels(T record, int numberOfChannels) {
-		switch (strategy) {
-			case FORWARD:
-			case PARTITION_RANDOM:
-			case PARTITION_FORCED_REBALANCE:
-				return robin(numberOfChannels);
-			case PARTITION_HASH:
-				return hashPartitionDefault(record, numberOfChannels);
-			case PARTITION_RANGE:
-				return rangePartition(record, numberOfChannels);
-			case BROADCAST:
-				return broadcast(numberOfChannels);
-			default:
-				throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	private final int[] robin(int numberOfChannels) {
-		if (this.channels == null || this.channels.length != 1) {
-			this.channels = new int[1];
-		}
-
-		int nextChannel = nextChannelToSendTo + 1;
-		nextChannel = nextChannel < numberOfChannels ? nextChannel : 0;
-
-		this.nextChannelToSendTo = nextChannel;
-		this.channels[0] = nextChannel;
-		return this.channels;
-	}
-
-	private final int[] broadcast(int numberOfChannels) {
-		if (channels == null || channels.length != numberOfChannels) {
-			channels = new int[numberOfChannels];
-			for (int i = 0; i < numberOfChannels; i++) {
-				channels[i] = i;
-			}
-		}
-
-		return channels;
-	}
-
-	private final int[] hashPartitionDefault(T record, int numberOfChannels) {
-		if (channels == null || channels.length != 1) {
-			channels = new int[1];
-		}
-
-		int hash = this.comparator.hash(record);
-
-		hash = murmurHash(hash);
-
-		if (hash >= 0) {
-			this.channels[0] = hash % numberOfChannels;
-		}
-		else if (hash != Integer.MIN_VALUE) {
-			this.channels[0] = -hash % numberOfChannels;
-		}
-		else {
-			this.channels[0] = 0;
-		}
-
-		return this.channels;
-	}
-
-	private final int murmurHash(int k) {
-		k *= 0xcc9e2d51;
-		k = Integer.rotateLeft(k, 15);
-		k *= 0x1b873593;
-
-		k = Integer.rotateLeft(k, 13);
-		k *= 0xe6546b64;
-
-		k ^= 4;
-		k ^= k >>> 16;
-		k *= 0x85ebca6b;
-		k ^= k >>> 13;
-		k *= 0xc2b2ae35;
-		k ^= k >>> 16;
-
-		return k;
-	}
-
-	private final int[] rangePartition(T record, int numberOfChannels) {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
deleted file mode 100644
index 39d247c..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class DummyInvokable extends AbstractInvokable {
-
-	private ExecutionConfig executionConfig;
-
-	public DummyInvokable() {
-	}
-
-	public DummyInvokable(ExecutionConfig executionConfig) {
-		this.executionConfig = executionConfig;
-	}
-
-	public void setExecutionConfig(ExecutionConfig executionConfig) {
-		this.executionConfig = executionConfig;
-	}
-
-	@Override
-	public void registerInputOutput() {}
-
-
-	@Override
-	public void invoke() throws Exception {}
-
-	@Override
-	public ExecutionConfig getExecutionConfig() {
-		return executionConfig;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
deleted file mode 100644
index 202cb24..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.commons.codec.binary.Base64;
-
-import java.io.IOException;
-
-public class EncodingUtils {
-
-	public static Object decodeObjectFromString(String encoded, ClassLoader cl) {
-
-		try {
-			if (encoded == null) {
-				return null;
-			}
-			byte[] bytes = Base64.decodeBase64(encoded);
-
-			return InstantiationUtil.deserializeObject(bytes, cl);
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			System.exit(-1);
-			throw new RuntimeException();
-		}
-		catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			System.exit(-1);
-			throw new RuntimeException();
-		}
-	}
-
-	public static String encodeObjectToString(Object o) {
-
-		try {
-			byte[] bytes = InstantiationUtil.serializeObject(o);
-
-			String encoded = Base64.encodeBase64String(bytes);
-			return encoded;
-		}
-		catch (IOException e) {
-			e.printStackTrace();
-			System.exit(-1);
-			throw new RuntimeException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
deleted file mode 100644
index 07c5f97..0000000
--- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/util/FlinkSerialization.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.util;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class FlinkSerialization<T> extends Configured implements Serialization<T>{
-
-	@Override
-	public boolean accept(Class<?> c) {
-		TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader());
-		T instance = typeSerializer.createInstance();
-		return instance.getClass().isAssignableFrom(c);
-	}
-
-	@Override
-	public Serializer<T> getSerializer(Class<T> c) {
-		TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader());
-		return new FlinkSerializer<T>(typeSerializer);
-	}
-
-	@Override
-	public Deserializer<T> getDeserializer(Class<T> c) {
-		TypeSerializer<T> typeSerializer = (TypeSerializer) EncodingUtils.decodeObjectFromString(this.getConf().get("io.flink.typeserializer"), getClass().getClassLoader());
-		return new FlinkDeserializer<T>(typeSerializer);
-	}
-
-	public static class FlinkSerializer<T> implements Serializer<T> {
-
-		private OutputStream dataOut;
-		private DataOutputViewOutputStreamWrapper dataOutputView;
-		private TypeSerializer<T> typeSerializer;
-
-		public FlinkSerializer(TypeSerializer<T> typeSerializer) {
-			this.typeSerializer = typeSerializer;
-		}
-
-		@Override
-		public void open(OutputStream out) throws IOException {
-			this.dataOut = out;
-			this.dataOutputView = new DataOutputViewOutputStreamWrapper(out);
-		}
-
-		@Override
-		public void serialize(T t) throws IOException {
-			typeSerializer.serialize(t, dataOutputView);
-		}
-
-		@Override
-		public void close() throws IOException {
-			this.dataOut.close();
-		}
-	}
-
-	public static class FlinkDeserializer<T> implements Deserializer<T> {
-
-		private InputStream dataIn;
-		private TypeSerializer<T> typeSerializer;
-		private DataInputViewInputStreamWrapper dataInputView;
-
-		public FlinkDeserializer(TypeSerializer<T> typeSerializer) {
-			this.typeSerializer = typeSerializer;
-		}
-
-		@Override
-		public void open(InputStream in) throws IOException {
-			this.dataIn = in;
-			this.dataInputView = new DataInputViewInputStreamWrapper(in);
-		}
-
-		@Override
-		public T deserialize(T t) throws IOException {
-			T reuse = t;
-			if (reuse == null) {
-				reuse = typeSerializer.createInstance();
-			}
-			return typeSerializer.deserialize(reuse, dataInputView);
-		}
-
-		@Override
-		public void close() throws IOException {
-			this.dataIn.close();
-		}
-	}
-
-	private static final class DataOutputViewOutputStreamWrapper implements DataOutputView {
-
-		private final DataOutputStream dos;
-
-		public DataOutputViewOutputStreamWrapper(OutputStream output) {
-			this.dos = new DataOutputStream(output);
-		}
-
-		@Override
-		public void write(int b) throws IOException {
-			dos.write(b);
-		}
-
-		@Override
-		public void write(byte[] b) throws IOException {
-			dos.write(b);
-		}
-
-		@Override
-		public void write(byte[] b, int off, int len) throws IOException {
-			dos.write(b, off, len);
-		}
-
-		@Override
-		public void writeBoolean(boolean v) throws IOException {
-			dos.writeBoolean(v);
-		}
-
-		@Override
-		public void writeByte(int v) throws IOException {
-			dos.writeByte(v);
-		}
-
-		@Override
-		public void writeShort(int v) throws IOException {
-			dos.writeShort(v);
-		}
-
-		@Override
-		public void writeChar(int v) throws IOException {
-			dos.writeChar(v);
-		}
-
-		@Override
-		public void writeInt(int v) throws IOException {
-			dos.writeInt(v);
-		}
-
-		@Override
-		public void writeLong(long v) throws IOException {
-			dos.writeLong(v);
-		}
-
-		@Override
-		public void writeFloat(float v) throws IOException {
-			dos.writeFloat(v);
-		}
-
-		@Override
-		public void writeDouble(double v) throws IOException {
-			dos.writeDouble(v);
-		}
-
-		@Override
-		public void writeBytes(String s) throws IOException {
-			dos.writeBytes(s);
-		}
-
-		@Override
-		public void writeChars(String s) throws IOException {
-			dos.writeChars(s);
-		}
-
-		@Override
-		public void writeUTF(String s) throws IOException {
-			dos.writeUTF(s);
-		}
-
-		@Override
-		public void skipBytesToWrite(int num) throws IOException {
-			for (int i = 0; i < num; i++) {
-				dos.write(0);
-			}
-		}
-
-		@Override
-		public void write(DataInputView inview, int num) throws IOException {
-			for (int i = 0; i < num; i++) {
-				dos.write(inview.readByte());
-			}
-		}
-	}
-
-	private static final class DataInputViewInputStreamWrapper implements DataInputView {
-
-		private final DataInputStream dis;
-
-
-		public DataInputViewInputStreamWrapper(InputStream input) {
-			this.dis = new DataInputStream(input);
-		}
-
-		@Override
-		public void readFully(byte[] b) throws IOException {
-			dis.readFully(b);
-		}
-
-		@Override
-		public void readFully(byte[] b, int off, int len) throws IOException {
-			dis.readFully(b, off, len);
-		}
-
-		@Override
-		public int skipBytes(int n) throws IOException {
-			return dis.skipBytes(n);
-		}
-
-		@Override
-		public boolean readBoolean() throws IOException {
-			return dis.readBoolean();
-		}
-
-		@Override
-		public byte readByte() throws IOException {
-			return dis.readByte();
-		}
-
-		@Override
-		public int readUnsignedByte() throws IOException {
-			return dis.readUnsignedByte();
-		}
-
-		@Override
-		public short readShort() throws IOException {
-			return dis.readShort();
-		}
-
-		@Override
-		public int readUnsignedShort() throws IOException {
-			return dis.readUnsignedShort();
-		}
-
-		@Override
-		public char readChar() throws IOException {
-			return dis.readChar();
-		}
-
-		@Override
-		public int readInt() throws IOException {
-			return dis.readInt();
-		}
-
-		@Override
-		public long readLong() throws IOException {
-			return dis.readLong();
-		}
-
-		@Override
-		public float readFloat() throws IOException {
-			return dis.readFloat();
-		}
-
-		@Override
-		public double readDouble() throws IOException {
-			return dis.readDouble();
-		}
-
-		@Override
-		public String readLine() throws IOException {
-			return dis.readLine();
-		}
-
-		@Override
-		public String readUTF() throws IOException {
-			return dis.readUTF();
-		}
-
-		@Override
-		public void skipBytesToRead(int numBytes) throws IOException {
-			while (numBytes > 0) {
-				numBytes -= dis.skipBytes(numBytes);
-			}
-		}
-
-		@Override
-		public int read(byte[] b, int off, int len) throws IOException {
-			dis.readFully(b, off, len);
-			return len;
-		}
-
-		@Override
-		public int read(byte[] b) throws IOException {
-			return read(b, 0, b.length);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/main/resources/log4j.properties b/flink-contrib/flink-tez/src/main/resources/log4j.properties
deleted file mode 100644
index 0845c81..0000000
--- a/flink-contrib/flink-tez/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=INFO, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
deleted file mode 100644
index 9124faa..0000000
--- a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/ConnectedComponentsStepITCase.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.tez.examples.ConnectedComponentsStep;
-import org.junit.Assert;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.util.regex.Pattern;
-
-/*
- * Note: This does not test whether the program computes one step of the
- * Weakly Connected Components program correctly. It only tests whether
- * the program assigns a wrong component to a vertex.
- */
-
-public class ConnectedComponentsStepITCase extends TezProgramTestBase {
-
-    private static final long SEED = 0xBADC0FFEEBEEFL;
-
-    private static final int NUM_VERTICES = 1000;
-
-    private static final int NUM_EDGES = 10000;
-
-
-    private String verticesPath;
-    private String edgesPath;
-    private String resultPath;
-
-
-    @Override
-    protected void preSubmit() throws Exception {
-        verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
-        edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
-        resultPath = getTempFilePath("results");
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        ConnectedComponentsStep.main(verticesPath, edgesPath, resultPath, "100");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        for (BufferedReader reader : getResultReader(resultPath)) {
-            checkOddEvenResult(reader);
-        }
-    }
-
-    private static void checkOddEvenResult(BufferedReader result) throws IOException {
-        Pattern split = Pattern.compile(" ");
-        String line;
-        while ((line = result.readLine()) != null) {
-            String[] res = split.split(line);
-            Assert.assertEquals("Malformed result: Wrong number of tokens in line.", 2, res.length);
-            try {
-                int vertex = Integer.parseInt(res[0]);
-                int component = Integer.parseInt(res[1]);
-                Assert.assertTrue(((vertex % 2) == (component % 2)));
-            } catch (NumberFormatException e) {
-                Assert.fail("Malformed result.");
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
deleted file mode 100644
index 9a203fe..0000000
--- a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/PageRankBasicStepITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.tez.examples.PageRankBasicStep;
-
-public class PageRankBasicStepITCase extends TezProgramTestBase {
-
-    private String verticesPath;
-    private String edgesPath;
-    private String resultPath;
-    private String expectedResult;
-
-    public static final String RANKS_AFTER_1_ITERATION = "1 0.2\n" +
-            "2 0.25666666666666665\n" +
-            "3 0.1716666666666667\n" +
-            "4 0.1716666666666667\n" +
-            "5 0.2";
-
-    @Override
-    protected void preSubmit() throws Exception {
-        resultPath = getTempDirPath("result");
-        verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
-        edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        PageRankBasicStep.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "-1"});
-        expectedResult = RANKS_AFTER_1_ITERATION;
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareKeyValuePairsWithDelta(expectedResult, resultPath, " ", 0.001);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
deleted file mode 100644
index eda9d1a..0000000
--- a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/TezProgramTestBase.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.tez.client.LocalTezEnvironment;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public abstract class TezProgramTestBase extends AbstractTestBase {
-
-    private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4;
-
-    private JobExecutionResult latestExecutionResult;
-
-    private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
-
-
-    public TezProgramTestBase() {
-        this(new Configuration());
-    }
-
-    public TezProgramTestBase(Configuration config) {
-        super (config);
-    }
-
-
-    public void setParallelism(int degreeOfParallelism) {
-        this.degreeOfParallelism = degreeOfParallelism;
-    }
-
-    public JobExecutionResult getLatestExecutionResult() {
-        return this.latestExecutionResult;
-    }
-
-
-    protected abstract void testProgram() throws Exception;
-
-    protected void preSubmit() throws Exception {}
-
-    protected void postSubmit() throws Exception {}
-
-    // --------------------------------------------------------------------------------------------
-    //  Test entry point
-    // --------------------------------------------------------------------------------------------
-
-    // Ignored due to deadlocks in Tez 0.6.1 (https://s3.amazonaws.com/archive.travis-ci.org/jobs/67848151/log.txt)
-    // TODO Reactivate with future Tez versions
-    @Ignore
-    @Test
-    public void testJob() throws Exception {
-        // pre-submit
-        try {
-            preSubmit();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Pre-submit work caused an error: " + e.getMessage());
-        }
-
-        // prepare the test environment
-        LocalTezEnvironment env = LocalTezEnvironment.create();
-        env.setParallelism(degreeOfParallelism);
-        env.setAsContext();
-
-        // call the test program
-        try {
-            testProgram();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Error while calling the test program: " + e.getMessage());
-        }
-
-        // post-submit
-        try {
-            postSubmit();
-        }
-        catch (Exception e) {
-            System.err.println(e.getMessage());
-            e.printStackTrace();
-            Assert.fail("Post-submit work caused an error: " + e.getMessage());
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
deleted file mode 100644
index 35aa54a..0000000
--- a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WebLogAnalysisITCase.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-
-import org.apache.flink.examples.java.relational.WebLogAnalysis;
-import org.apache.flink.test.testdata.WebLogAnalysisData;
-
-public class WebLogAnalysisITCase extends TezProgramTestBase {
-
-    private String docsPath;
-    private String ranksPath;
-    private String visitsPath;
-    private String resultPath;
-
-    @Override
-    protected void preSubmit() throws Exception {
-        docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
-        ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
-        visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
-        resultPath = getTempDirPath("result");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath);
-    }
-    @Override
-    protected void testProgram() throws Exception {
-        WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, resultPath});
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java b/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
deleted file mode 100644
index d73aa8b..0000000
--- a/flink-contrib/flink-tez/src/test/java/org/apache/flink/tez/test/WordCountITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.test;
-
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class WordCountITCase extends TezProgramTestBase {
-
-    protected String textPath;
-    protected String resultPath;
-
-    public WordCountITCase(){
-    }
-
-    @Override
-    protected void preSubmit() throws Exception {
-        textPath = createTempFile("text.txt", WordCountData.TEXT);
-        resultPath = getTempDirPath("result");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        WordCount.main(new String[]{textPath, resultPath});
-    }
-}


Mime
View raw message