flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsapu...@apache.org
Subject [1/5] flink git commit: [FLINK-2815] [REFACTOR] Remove Pact from class and file names since it is no longer valid reference
Date Tue, 06 Oct 2015 15:14:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master e494c2795 -> b08669abf


http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
new file mode 100644
index 0000000..fd5d238
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+/**
+ * The task context gives a driver (e.g., {@link MapDriver}, or {@link JoinDriver}) access to
+ * the runtime components and configuration that they can use to fulfil their task.
+ *
+ * @param <S> The UDF type.
+ * @param <OT> The produced data type.
+ *
+ * @see Driver
+ */
+public interface TaskContext<S, OT> {
+	
+	TaskConfig getTaskConfig();
+	
+	TaskManagerRuntimeInfo getTaskManagerInfo();
+
+	ClassLoader getUserCodeClassLoader();
+	
+	MemoryManager getMemoryManager();
+	
+	IOManager getIOManager();
+
+	<X> MutableObjectIterator<X> getInput(int index);
+	
+	<X> TypeSerializerFactory<X> getInputSerializer(int index);
+	
+	<X> TypeComparator<X> getDriverComparator(int index);
+	
+	S getStub();
+
+	ExecutionConfig getExecutionConfig();
+
+	Collector<OT> getOutputCollector();
+	
+	AbstractInvokable getOwningNepheleTask();
+	
+	String formatLogString(String message);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
index 098686c..4791761 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
@@ -22,18 +22,18 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class UnionWithTempOperator<T> implements PactDriver<Function, T> {
+public class UnionWithTempOperator<T> implements Driver<Function, T> {
 	
 	private static final int CACHED_INPUT = 0;
 	private static final int STREAMED_INPUT = 1;
 	
-	private PactTaskContext<Function, T> taskContext;
+	private TaskContext<Function, T> taskContext;
 	
 	private volatile boolean running;
 	
 	
 	@Override
-	public void setup(PactTaskContext<Function, T> context) {
+	public void setup(TaskContext<Function, T> context) {
 		this.taskContext = context;
 		this.running = true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
index 4641fce..46ee41b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +41,7 @@ public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> {
 	@Override
 	public void setup(AbstractInvokable parent) {
 		@SuppressWarnings("unchecked")
-		final ReduceFunction<IT> red = RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, ReduceFunction.class);
+		final ReduceFunction<IT> red = BatchTask.instantiateUserCode(this.config, userCodeClassLoader, ReduceFunction.class);
 		this.reducer = red;
 		FunctionUtils.setFunctionRuntimeContext(red, getUdfRuntimeContext());
 
@@ -56,12 +56,12 @@ public class ChainedAllReduceDriver<IT> extends ChainedDriver<IT, IT> {
 	@Override
 	public void openTask() throws Exception {
 		Configuration stubConfig = this.config.getStubParameters();
-		RegularPactTask.openUserCode(this.reducer, stubConfig);
+		BatchTask.openUserCode(this.reducer, stubConfig);
 	}
 
 	@Override
 	public void closeTask() throws Exception {
-		RegularPactTask.closeUserCode(this.reducer);
+		BatchTask.closeUserCode(this.reducer);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
index 482103c..8900ed7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 
 @SuppressWarnings("deprecation")
 public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
@@ -35,7 +35,7 @@ public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	public void setup(AbstractInvokable parent) {
 		@SuppressWarnings("unchecked")
 		final GenericCollectorMap<IT, OT> mapper =
-			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericCollectorMap.class);
+			BatchTask.instantiateUserCode(this.config, userCodeClassLoader, GenericCollectorMap.class);
 		this.mapper = mapper;
 		mapper.setRuntimeContext(getUdfRuntimeContext());
 	}
@@ -43,12 +43,12 @@ public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	@Override
 	public void openTask() throws Exception {
 		Configuration stubConfig = this.config.getStubParameters();
-		RegularPactTask.openUserCode(this.mapper, stubConfig);
+		BatchTask.openUserCode(this.mapper, stubConfig);
 	}
 
 	@Override
 	public void closeTask() throws Exception {
-		RegularPactTask.closeUserCode(this.mapper);
+		BatchTask.closeUserCode(this.mapper);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index ea6cfe3..6edeb84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
@@ -32,7 +32,7 @@ import org.apache.flink.util.Collector;
 import java.util.Map;
 
 /**
- * The interface to be implemented by drivers that do not run in an own pact task context, but are chained to other
+ * The interface to be implemented by drivers that do not run in an own task context, but are chained to other
  * tasks.
  */
 public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
@@ -63,8 +63,8 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 
 		Environment env = parent.getEnvironment();
 
-		if (parent instanceof RegularPactTask) {
-			this.udfContext = ((RegularPactTask<?, ?>) parent).createRuntimeContext(taskName);
+		if (parent instanceof BatchTask) {
+			this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(taskName);
 		} else {
 			this.udfContext = new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
 					env.getIndexInSubtaskGroup(), userCodeClassLoader, parent.getExecutionConfig(),

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
index bc3b6a1..f51cb68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 
 public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 
@@ -36,7 +36,7 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	public void setup(AbstractInvokable parent) {
 		@SuppressWarnings("unchecked")
 		final FlatMapFunction<IT, OT> mapper =
-			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatMapFunction.class);
+			BatchTask.instantiateUserCode(this.config, userCodeClassLoader, FlatMapFunction.class);
 		this.mapper = mapper;
 		FunctionUtils.setFunctionRuntimeContext(mapper, getUdfRuntimeContext());
 	}
@@ -44,12 +44,12 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	@Override
 	public void openTask() throws Exception {
 		Configuration stubConfig = this.config.getStubParameters();
-		RegularPactTask.openUserCode(this.mapper, stubConfig);
+		BatchTask.openUserCode(this.mapper, stubConfig);
 	}
 
 	@Override
 	public void closeTask() throws Exception {
-		RegularPactTask.closeUserCode(this.mapper);
+		BatchTask.closeUserCode(this.mapper);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
index db192df..9b888f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 
 public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 
@@ -36,7 +36,7 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	public void setup(AbstractInvokable parent) {
 		@SuppressWarnings("unchecked")
 		final MapFunction<IT, OT> mapper =
-			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, MapFunction.class);
+			BatchTask.instantiateUserCode(this.config, userCodeClassLoader, MapFunction.class);
 		this.mapper = mapper;
 		FunctionUtils.setFunctionRuntimeContext(mapper, getUdfRuntimeContext());
 	}
@@ -44,12 +44,12 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	@Override
 	public void openTask() throws Exception {
 		Configuration stubConfig = this.config.getStubParameters();
-		RegularPactTask.openUserCode(this.mapper, stubConfig);
+		BatchTask.openUserCode(this.mapper, stubConfig);
 	}
 
 	@Override
 	public void closeTask() throws Exception {
-		RegularPactTask.closeUserCode(this.mapper);
+		BatchTask.closeUserCode(this.mapper);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
index cf0fc85..4a04fb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
@@ -29,7 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
 import org.apache.flink.runtime.operators.sort.InMemorySorter;
 import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
@@ -87,7 +87,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 
 		@SuppressWarnings("unchecked")
 		final GroupReduceFunction<IN, OUT> combiner =
-			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GroupReduceFunction.class);
+			BatchTask.instantiateUserCode(this.config, userCodeClassLoader, GroupReduceFunction.class);
 		this.reducer = combiner;
 		FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext());
 	}
@@ -96,7 +96,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 	public void openTask() throws Exception {
 		// open the stub first
 		final Configuration stubConfig = this.config.getStubParameters();
-		RegularPactTask.openUserCode(this.reducer, stubConfig);
+		BatchTask.openUserCode(this.reducer, stubConfig);
 
 		// ----------------- Set up the sorter -------------------------
 
@@ -135,7 +135,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 		this.parent.getEnvironment().getMemoryManager().release(this.memory);
 
 		if (this.running) {
-			RegularPactTask.closeUserCode(this.reducer);
+			BatchTask.closeUserCode(this.reducer);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index da9698c..408abc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -32,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
 import org.apache.flink.runtime.operators.sort.InMemorySorter;
 import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
@@ -87,7 +87,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 
 		@SuppressWarnings("unchecked")
 		final GroupCombineFunction<IN, OUT> combiner =
-			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GroupCombineFunction.class);
+			BatchTask.instantiateUserCode(this.config, userCodeClassLoader, GroupCombineFunction.class);
 		this.combiner = combiner;
 		FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext());
 	}
@@ -96,7 +96,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 	public void openTask() throws Exception {
 		// open the stub first
 		final Configuration stubConfig = this.config.getStubParameters();
-		RegularPactTask.openUserCode(this.combiner, stubConfig);
+		BatchTask.openUserCode(this.combiner, stubConfig);
 
 		// ----------------- Set up the sorter -------------------------
 
@@ -134,7 +134,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 		this.parent.getEnvironment().getMemoryManager().release(this.memory);
 
 		if (this.running) {
-			RegularPactTask.closeUserCode(this.combiner);
+			BatchTask.closeUserCode(this.combiner);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index 6c97097..0254c8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -48,7 +48,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.types.Value;
@@ -307,11 +307,11 @@ public class TaskConfig implements Serializable {
 	//                                      Driver
 	// --------------------------------------------------------------------------------------------
 	
-	public void setDriver(@SuppressWarnings("rawtypes") Class<? extends PactDriver> driver) {
+	public void setDriver(@SuppressWarnings("rawtypes") Class<? extends Driver> driver) {
 		this.config.setString(DRIVER_CLASS, driver.getName());
 	}
 	
-	public <S extends Function, OT> Class<? extends PactDriver<S, OT>> getDriver() {
+	public <S extends Function, OT> Class<? extends Driver<S, OT>> getDriver() {
 		final String className = this.config.getString(DRIVER_CLASS, null);
 		if (className == null) {
 			throw new CorruptConfigurationException("The pact driver class is missing.");
@@ -319,7 +319,7 @@ public class TaskConfig implements Serializable {
 		
 		try {
 			@SuppressWarnings("unchecked")
-			final Class<PactDriver<S, OT>> pdClazz = (Class<PactDriver<S, OT>>) (Class<?>) PactDriver.class;
+			final Class<Driver<S, OT>> pdClazz = (Class<Driver<S, OT>>) (Class<?>) Driver.class;
 			return Class.forName(className).asSubclass(pdClazz);
 		} catch (ClassNotFoundException cnfex) {
 			throw new CorruptConfigurationException("The given driver class cannot be found.");

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 3a36fe8..58755f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
@@ -47,7 +47,7 @@ public class TaskDeploymentDescriptorTest {
 			final int currentNumberOfSubtasks = 1;
 			final Configuration jobConfiguration = new Configuration();
 			final Configuration taskConfiguration = new Configuration();
-			final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
+			final Class<? extends AbstractInvokable> invokableClass = BatchTask.class;
 			final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
 			final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
 			final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index e3fc852..bea7c22 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
@@ -69,10 +69,10 @@ public class ExecutionGraphDeploymentTest {
 			v3.setParallelism(10);
 			v4.setParallelism(10);
 
-			v1.setInvokableClass(RegularPactTask.class);
-			v2.setInvokableClass(RegularPactTask.class);
-			v3.setInvokableClass(RegularPactTask.class);
-			v4.setInvokableClass(RegularPactTask.class);
+			v1.setInvokableClass(BatchTask.class);
+			v2.setInvokableClass(BatchTask.class);
+			v3.setInvokableClass(BatchTask.class);
+			v4.setInvokableClass(BatchTask.class);
 
 			v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
 			v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
@@ -111,7 +111,7 @@ public class ExecutionGraphDeploymentTest {
 			assertEquals(jid2, descr.getVertexID());
 			assertEquals(3, descr.getIndexInSubtaskGroup());
 			assertEquals(10, descr.getNumberOfSubtasks());
-			assertEquals(RegularPactTask.class.getName(), descr.getInvokableClassName());
+			assertEquals(BatchTask.class.getName(), descr.getInvokableClassName());
 			assertEquals("v2", descr.getTaskName());
 
 			List<ResultPartitionDeploymentDescriptor> producedPartitions = descr.getProducedPartitions();
@@ -276,8 +276,8 @@ public class ExecutionGraphDeploymentTest {
 		v1.setParallelism(dop1);
 		v2.setParallelism(dop2);
 
-		v1.setInvokableClass(RegularPactTask.class);
-		v2.setInvokableClass(RegularPactTask.class);
+		v1.setInvokableClass(BatchTask.class);
+		v2.setInvokableClass(BatchTask.class);
 
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 88a71c4..1f19699 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.MapTaskTest.MockMapStub;
 import org.apache.flink.runtime.operators.ReduceTaskTest.MockReduceStub;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -102,8 +102,8 @@ public class ChainTaskTest extends TaskTestBase {
 			
 			// chained map+combine
 			{
-				RegularPactTask<GenericCollectorMap<Record, Record>, Record> testTask = 
-											new RegularPactTask<GenericCollectorMap<Record, Record>, Record>();
+				BatchTask<GenericCollectorMap<Record, Record>, Record> testTask =
+											new BatchTask<GenericCollectorMap<Record, Record>, Record>();
 				registerTask(testTask, CollectorMapDriver.class, MockMapStub.class);
 				
 				try {
@@ -163,8 +163,8 @@ public class ChainTaskTest extends TaskTestBase {
 			
 			// chained map+combine
 			{
-				final RegularPactTask<GenericCollectorMap<Record, Record>, Record> testTask = 
-											new RegularPactTask<GenericCollectorMap<Record, Record>, Record>();
+				final BatchTask<GenericCollectorMap<Record, Record>, Record> testTask =
+											new BatchTask<GenericCollectorMap<Record, Record>, Record>();
 				
 				super.registerTask(testTask, CollectorMapDriver.class, MockMapStub.class);
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 0a02f30..9be957a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -29,14 +29,14 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.PactTaskContext;
+import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
+public class TestTaskContext<S, T> implements TaskContext<S, T> {
 	
 	private final AbstractInvokable owner = new DummyInvokable();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 5136aea..7043a63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.PactDriver;
-import org.apache.flink.runtime.operators.PactTaskContext;
-import org.apache.flink.runtime.operators.ResettablePactDriver;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.TaskContext;
+import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -51,7 +51,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogger implements PactTaskContext<S, OUT> {
+public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogger implements TaskContext<S, OUT> {
 	
 	protected static final int PAGE_SIZE = 32 * 1024;
 	
@@ -81,7 +81,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
 	
 	private S stub;
 	
-	private PactDriver<S, IN> driver;
+	private Driver<S, IN> driver;
 	
 	private volatile boolean running = true;
 	
@@ -176,12 +176,12 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
 	}
 	
 	@SuppressWarnings("rawtypes")
-	public void testDriver(PactDriver driver, Class stubClass) throws Exception {
+	public void testDriver(Driver driver, Class stubClass) throws Exception {
 		testDriverInternal(driver, stubClass);
 	}
 	
 	@SuppressWarnings({"unchecked", "rawtypes"})
-	public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception {
+	public void testDriverInternal(Driver driver, Class stubClass) throws Exception {
 		
 		this.driver = driver;
 		driver.setup(this);
@@ -232,8 +232,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
 			}
 			
 			// if resettable driver invoke tear down
-			if (this.driver instanceof ResettablePactDriver) {
-				final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
+			if (this.driver instanceof ResettableDriver) {
+				final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver;
 				try {
 					resDriver.teardown();
 				} catch (Throwable t) {
@@ -252,7 +252,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
 	}
 	
 	@SuppressWarnings({"unchecked", "rawtypes"})
-	public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception {
+	public void testResettableDriver(ResettableDriver driver, Class stubClass, int iterations) throws Exception {
 		driver.setup(this);
 		
 		for (int i = 0; i < iterations; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 116fdec..c442940 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -24,6 +24,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
@@ -38,9 +39,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.PactDriver;
-import org.apache.flink.runtime.operators.PactTaskContext;
-import org.apache.flink.runtime.operators.ResettablePactDriver;
+import org.apache.flink.runtime.operators.TaskContext;
+import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.types.Record;
@@ -51,7 +51,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public class DriverTestBase<S extends Function> extends TestLogger implements PactTaskContext<S, Record> {
+public class DriverTestBase<S extends Function> extends TestLogger implements TaskContext<S, Record> {
 	
 	protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
 	
@@ -83,7 +83,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 	
 	private S stub;
 	
-	private PactDriver<S, Record> driver;
+	private Driver<S, Record> driver;
 	
 	private volatile boolean running = true;
 
@@ -168,12 +168,12 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 	}
 
 	@SuppressWarnings("rawtypes")
-	public void testDriver(PactDriver driver, Class stubClass) throws Exception {
+	public void testDriver(Driver driver, Class stubClass) throws Exception {
 		testDriverInternal(driver, stubClass);
 	}
 
 	@SuppressWarnings({"unchecked","rawtypes"})
-	public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception {
+	public void testDriverInternal(Driver driver, Class stubClass) throws Exception {
 
 		this.driver = driver;
 		driver.setup(this);
@@ -226,8 +226,8 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 			}
 
 			// if resettable driver invoke tear down
-			if (this.driver instanceof ResettablePactDriver) {
-				final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
+			if (this.driver instanceof ResettableDriver) {
+				final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver;
 				try {
 					resDriver.teardown();
 				} catch (Throwable t) {
@@ -247,7 +247,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 	}
 
 	@SuppressWarnings({"unchecked","rawtypes"})
-	public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception {
+	public void testResettableDriver(ResettableDriver driver, Class stubClass, int iterations) throws Exception {
 
 		driver.setup(this);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 4662762..777bfc8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -30,7 +30,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.types.Record;
@@ -89,7 +89,7 @@ public abstract class TaskTestBase extends TestLogger {
 	}
 
 	public void registerTask(AbstractInvokable task, 
-								@SuppressWarnings("rawtypes") Class<? extends PactDriver> driver,
+								@SuppressWarnings("rawtypes") Class<? extends Driver> driver,
 								Class<? extends RichFunction> stubClass) {
 		
 		final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index e2b2430..886c881 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.PactDriver;
-import org.apache.flink.runtime.operators.PactTaskContext;
-import org.apache.flink.runtime.operators.ResettablePactDriver;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.TaskContext;
+import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -51,7 +51,7 @@ import java.util.Collection;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogger implements PactTaskContext<S, OUT> {
+public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogger implements TaskContext<S, OUT> {
 	
 	protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
 	
@@ -85,7 +85,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
 	
 	private S stub;
 	
-	private PactDriver<S, OUT> driver;
+	private Driver<S, OUT> driver;
 	
 	private volatile boolean running;
 
@@ -170,12 +170,12 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
 	}
 
 	@SuppressWarnings("rawtypes")
-	public void testDriver(PactDriver driver, Class stubClass) throws Exception {
+	public void testDriver(Driver driver, Class stubClass) throws Exception {
 		testDriverInternal(driver, stubClass);
 	}
 
 	@SuppressWarnings({"unchecked","rawtypes"})
-	public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception {
+	public void testDriverInternal(Driver driver, Class stubClass) throws Exception {
 
 		this.driver = driver;
 		driver.setup(this);
@@ -227,8 +227,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
 			}
 
 			// if resettable driver invoke tear-down
-			if (this.driver instanceof ResettablePactDriver) {
-				final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
+			if (this.driver instanceof ResettableDriver) {
+				final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver;
 				try {
 					resDriver.teardown();
 				} catch (Throwable t) {
@@ -248,7 +248,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
 	}
 
 	@SuppressWarnings({"unchecked","rawtypes"})
-	public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception {
+	public void testResettableDriver(ResettableDriver driver, Class stubClass, int iterations) throws Exception {
 		driver.setup(this);
 		
 		for (int i = 0; i < iterations; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/logback-test.xml b/flink-runtime/src/test/resources/logback-test.xml
index 17f7020..1d64d46 100644
--- a/flink-runtime/src/test/resources/logback-test.xml
+++ b/flink-runtime/src/test/resources/logback-test.xml
@@ -31,7 +31,7 @@
          throw error to test failing scenarios. Logging those would overflow the log. -->
          <!---->
     <logger name="org.apache.flink.runtime.operators.DataSinkTask" level="OFF"/>
-    <logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>
+    <logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>
     <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
     <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
     <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-ml/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/resources/logback-test.xml b/flink-staging/flink-ml/src/test/resources/logback-test.xml
index 17f7020..1d64d46 100644
--- a/flink-staging/flink-ml/src/test/resources/logback-test.xml
+++ b/flink-staging/flink-ml/src/test/resources/logback-test.xml
@@ -31,7 +31,7 @@
          throw error to test failing scenarios. Logging those would overflow the log. -->
          <!---->
     <logger name="org.apache.flink.runtime.operators.DataSinkTask" level="OFF"/>
-    <logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>
+    <logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>
     <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
     <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
     <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
index b117bab..287129d 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.tez.util.EncodingUtils;
 import org.apache.flink.util.InstantiationUtil;
@@ -93,8 +93,8 @@ public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOP
 
 		this.inputs = inputs;
 		this.outputs = outputs;
-		final Class<? extends PactDriver<S, OT>> driverClass = this.task.getTaskConfig().getDriver();
-		PactDriver<S,OT> driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);
+		final Class<? extends Driver<S, OT>> driverClass = this.task.getTaskConfig().getDriver();
+		Driver<S,OT> driver = InstantiationUtil.instantiate(driverClass, Driver.class);
 		this.numInputs = driver.getNumberOfInputs();
 		this.numOutputs = outputs.size();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
index b7cbfb4..89e4642 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
@@ -36,8 +36,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.PactDriver;
-import org.apache.flink.runtime.operators.PactTaskContext;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
@@ -64,7 +64,7 @@ import java.util.Arrays;
 import java.util.List;
 
 
-public class TezTask<S extends Function,OT>  implements PactTaskContext<S, OT> {
+public class TezTask<S extends Function,OT>  implements TaskContext<S, OT> {
 
 	protected static final Log LOG = LogFactory.getLog(TezTask.class);
 
@@ -74,7 +74,7 @@ public class TezTask<S extends Function,OT>  implements PactTaskContext<S, OT> {
 	 * The driver that invokes the user code (the stub implementation). The central driver in this task
 	 * (further drivers may be chained behind this driver).
 	 */
-	protected volatile PactDriver<S, OT> driver;
+	protected volatile Driver<S, OT> driver;
 
 	/**
 	 * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf.
@@ -150,8 +150,8 @@ public class TezTask<S extends Function,OT>  implements PactTaskContext<S, OT> {
 
 	public TezTask(TezTaskConfig config, RuntimeUDFContext runtimeUdfContext, long availableMemory) {
 		this.config = config;
-		final Class<? extends PactDriver<S, OT>> driverClass = this.config.getDriver();
-		this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);
+		final Class<? extends Driver<S, OT>> driverClass = this.config.getDriver();
+		this.driver = InstantiationUtil.instantiate(driverClass, Driver.class);
 		
 		LOG.info("ClassLoader URLs: " + Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs()));
 		
@@ -244,7 +244,7 @@ public class TezTask<S extends Function,OT>  implements PactTaskContext<S, OT> {
 
 
 	// --------------------------------------------------------------------
-	// PactTaskContext interface
+	// TaskContext interface
 	// --------------------------------------------------------------------
 
 	@Override
@@ -356,7 +356,7 @@ public class TezTask<S extends Function,OT>  implements PactTaskContext<S, OT> {
 
 
 	// --------------------------------------------------------------------
-	// Adapted from RegularPactTask
+	// Adapted from BatchTask
 	// --------------------------------------------------------------------
 
 	private void initInputLocalStrategy(int inputNum) throws Exception {
@@ -402,7 +402,7 @@ public class TezTask<S extends Function,OT>  implements PactTaskContext<S, OT> {
 						localStub = initStub(userCodeFunctionType);
 					} catch (Exception e) {
 						throw new RuntimeException("Initializing the user code and the configuration failed" +
-								e.getMessage() == null ? "." : ": " + e.getMessage(), e);
+								(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
 					}
 
 					if (!(localStub instanceof GroupCombineFunction)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-tez/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/resources/logback-test.xml b/flink-staging/flink-tez/src/test/resources/logback-test.xml
index 9c2e75f..48e4374 100644
--- a/flink-staging/flink-tez/src/test/resources/logback-test.xml
+++ b/flink-staging/flink-tez/src/test/resources/logback-test.xml
@@ -27,7 +27,7 @@
         <appender-ref ref="STDOUT"/>
     </root>
 
-    <!--<logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>-->
+    <!--<logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>-->

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-tests/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/logback-test.xml b/flink-tests/src/test/resources/logback-test.xml
index 9c2e75f..48e4374 100644
--- a/flink-tests/src/test/resources/logback-test.xml
+++ b/flink-tests/src/test/resources/logback-test.xml
@@ -27,7 +27,7 @@
         <appender-ref ref="STDOUT"/>
     </root>
 
-    <!--<logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>-->
+    <!--<logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>-->


Mime
View raw message