flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject flink git commit: [FLINK-3666] Remove all remaining Nephele references
Date Fri, 15 Jul 2016 09:52:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master 234646844 -> 273f54ba4


[FLINK-3666] Remove all remaining Nephele references


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/273f54ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/273f54ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/273f54ba

Branch: refs/heads/master
Commit: 273f54ba4357ccc4e4ade35b8967f0fa607a1ea8
Parents: 2346468
Author: zentol <chesnay@apache.org>
Authored: Wed Jul 13 16:15:28 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Fri Jul 15 11:46:34 2016 +0200

----------------------------------------------------------------------
 docs/setup/local_setup.md                             | 13 ++++++-------
 .../runtime/iterative/task/IterationHeadTask.java     |  2 +-
 .../operators/AbstractCachedBuildSideJoinDriver.java  |  8 ++++----
 .../org/apache/flink/runtime/operators/BatchTask.java | 14 +++++++-------
 .../apache/flink/runtime/operators/CrossDriver.java   | 12 ++++++------
 .../flink/runtime/operators/FullOuterJoinDriver.java  | 12 ++++++------
 .../runtime/operators/GroupReduceCombineDriver.java   |  2 +-
 .../apache/flink/runtime/operators/JoinDriver.java    | 12 ++++++------
 .../flink/runtime/operators/LeftOuterJoinDriver.java  | 12 ++++++------
 .../flink/runtime/operators/ReduceCombineDriver.java  |  2 +-
 .../flink/runtime/operators/RightOuterJoinDriver.java | 12 ++++++------
 .../apache/flink/runtime/operators/TaskContext.java   |  2 +-
 .../runtime/operators/ReduceTaskExternalITCase.java   |  4 ++--
 .../flink/runtime/operators/ReduceTaskTest.java       |  2 +-
 .../runtime/operators/drivers/TestTaskContext.java    |  2 +-
 .../operators/testutils/BinaryOperatorTestBase.java   |  2 +-
 .../runtime/operators/testutils/DriverTestBase.java   |  2 +-
 .../operators/testutils/UnaryOperatorTestBase.java    |  2 +-
 18 files changed, 58 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/docs/setup/local_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/local_setup.md b/docs/setup/local_setup.md
index 43a9b79..983fa91 100644
--- a/docs/setup/local_setup.md
+++ b/docs/setup/local_setup.md
@@ -70,18 +70,17 @@ The out of the box configuration will use your default Java installation.
You ca
 $ tar xzf flink-*.tgz
 $ cd flink
 $ bin/start-local.sh
-Starting job manager
+Starting jobmanager.
 ~~~
 
 You can check that the system is running by checking the log files in the `logs` directory:
 
 ~~~bash
 $ tail log/flink-*-jobmanager-*.log
-INFO ... - Initializing memory manager with 409 megabytes of memory
-INFO ... - Trying to load org.apache.flinknephele.jobmanager.scheduler.local.LocalScheduler
as scheduler
-INFO ... - Setting up web info server, using web-root directory ...
-INFO ... - Web info server will display information about nephele job-manager on localhost,
port 8081.
-INFO ... - Starting web info server for JobManager on port 8081
+INFO ... - Starting JobManager
+INFO ... - Starting JobManager web frontend
+INFO ... - Web frontend listening at 127.0.0.1:8081
+INFO ... - Registered TaskManager at 127.0.0.1 (akka://flink/user/taskmanager)
 ~~~
 
 The JobManager will also start a web frontend on port 8081, which you can check with your
browser at `http://localhost:8081`.
@@ -117,7 +116,7 @@ With *Cygwin* you need to start the Cygwin Terminal, navigate to your
Flink dire
 ~~~bash
 $ cd flink
 $ bin/start-local.sh
-Starting Nephele job manager
+Starting jobmanager.
 ~~~
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 66778c9..4bc4532 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -171,7 +171,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends
AbstractIte
 		boolean success = false;
 		try {
 			int numPages = getMemoryManager().computeNumberOfPages(hashjoinMemorySize);
-			memSegments = getMemoryManager().allocatePages(getOwningNepheleTask(), numPages);
+			memSegments = getMemoryManager().allocatePages(getContainingTask(), numPages);
 			hashTable = new CompactingHashTable<BT>(solutionTypeSerializer, solutionTypeComparator,
memSegments);
 			success = true;
 			return hashTable;

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index 406d430..8c66cc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -97,7 +97,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT>
extends Jo
 						pairComparatorFactory.createComparator21(comparator1, comparator2),
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						availableMemory,
 						false,
 						false,
@@ -113,7 +113,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT>
extends Jo
 						pairComparatorFactory.createComparator12(comparator1, comparator2),
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						availableMemory,
 						false,
 						false,
@@ -132,7 +132,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT>
extends Jo
 						pairComparatorFactory.createComparator21(comparator1, comparator2),
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						availableMemory,
 						false,
 						false,
@@ -148,7 +148,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT>
extends Jo
 						pairComparatorFactory.createComparator12(comparator1, comparator2),
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						availableMemory,
 						false,
 						false,

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 68995d8..29f1c20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -1061,7 +1061,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable
impleme
 	}
 
 	@Override
-	public AbstractInvokable getOwningNepheleTask() {
+	public AbstractInvokable getContainingTask() {
 		return this;
 	}
 
@@ -1154,7 +1154,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable
impleme
 	 *
 	 * @param message The main message for the log.
 	 * @param taskName The name of the task.
-	 * @param parent The nephele task that contains the code producing the message.
+	 * @param parent The task that contains the code producing the message.
 	 *
 	 * @return The string for logging.
 	 */
@@ -1260,7 +1260,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable
impleme
 	 * The output collector applies the configured shipping strategy.
 	 */
 	@SuppressWarnings("unchecked")
-	public static <T> Collector<T> initOutputs(AbstractInvokable nepheleTask, ClassLoader
cl, TaskConfig config,
+	public static <T> Collector<T> initOutputs(AbstractInvokable containingTask,
ClassLoader cl, TaskConfig config,
 										List<ChainedDriver<?, ?>> chainedTasksTarget,
 										List<RecordWriter<?>> eventualOutputs,
 										ExecutionConfig executionConfig,
@@ -1299,21 +1299,21 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable
impleme
 
 				if (i == numChained - 1) {
 					// last in chain, instantiate the output collector for this task
-					previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0,
chainedStubConf.getNumOutputs(), reporter);
+					previous = getOutputCollector(containingTask, chainedStubConf, cl, eventualOutputs,
0, chainedStubConf.getNumOutputs(), reporter);
 				}
 
-				ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig, accumulatorMap);
+				ct.setup(chainedStubConf, taskName, previous, containingTask, cl, executionConfig, accumulatorMap);
 				chainedTasksTarget.add(0, ct);
 
 				previous = ct;
 			}
-			// the collector of the first in the chain is the collector for the nephele task
+			// the collector of the first in the chain is the collector for the task
 			return (Collector<T>) previous;
 		}
 		// else
 
 		// instantiate the output collector the default way from this configuration
-		return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs, reporter);
+		return getOutputCollector(containingTask , config, cl, eventualOutputs, 0, numOutputs,
reporter);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index 3e1d01f..fee0874 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -209,12 +209,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1,
T2, OT>
 		
 		final BlockResettableMutableObjectIterator<T1> blockVals = 
 				new BlockResettableMutableObjectIterator<T1>(this.memManager, in1, serializer1,
this.memPagesForBlockSide,
-							this.taskContext.getOwningNepheleTask());
+							this.taskContext.getContainingTask());
 		this.blockIter = blockVals;
 		
 		final SpillingResettableMutableObjectIterator<T2> spillVals = new SpillingResettableMutableObjectIterator<T2>(
 				in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
-				this.taskContext.getOwningNepheleTask());
+				this.taskContext.getContainingTask());
 		this.spillIter = spillVals;
 		
 
@@ -277,12 +277,12 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1,
T2, OT>
 		
 		final SpillingResettableMutableObjectIterator<T1> spillVals = new SpillingResettableMutableObjectIterator<T1>(
 				in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
-				this.taskContext.getOwningNepheleTask());
+				this.taskContext.getContainingTask());
 		this.spillIter = spillVals;
 		
 		final BlockResettableMutableObjectIterator<T2> blockVals = 
 				new BlockResettableMutableObjectIterator<T2>(this.memManager, in2, serializer2,
this.memPagesForBlockSide,
-						this.taskContext.getOwningNepheleTask());
+						this.taskContext.getContainingTask());
 		this.blockIter = blockVals;
 		
 		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
@@ -343,7 +343,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1,
T2, OT>
 		
 		final SpillingResettableMutableObjectIterator<T2> spillVals = new SpillingResettableMutableObjectIterator<T2>(
 				in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
-				this.taskContext.getOwningNepheleTask());
+				this.taskContext.getContainingTask());
 		this.spillIter = spillVals;
 		
 		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
@@ -396,7 +396,7 @@ public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1,
T2, OT>
 		
 		final SpillingResettableMutableObjectIterator<T1> spillVals = new SpillingResettableMutableObjectIterator<T1>(
 				in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide,
-				this.taskContext.getOwningNepheleTask());
+				this.taskContext.getContainingTask());
 		this.spillIter = spillVals;
 
 		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
index a41a6ec..98a72ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
@@ -68,7 +68,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 						memoryManager,
 						ioManager,
 						numPages,
-						super.taskContext.getOwningNepheleTask()
+						super.taskContext.getContainingTask()
 				);
 		case FULL_OUTER_HYBRIDHASH_BUILD_FIRST:
 			return new ReusingBuildFirstHashJoinIterator<>(in1, in2,
@@ -76,7 +76,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 					serializer2, comparator2,
 					pairComparatorFactory.createComparator21(comparator1, comparator2),
 					memoryManager, ioManager,
-					this.taskContext.getOwningNepheleTask(),
+					this.taskContext.getContainingTask(),
 					driverMemFraction,
 					true,
 					true,
@@ -87,7 +87,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 					serializer2, comparator2,
 					pairComparatorFactory.createComparator12(comparator1, comparator2),
 					memoryManager, ioManager,
-					this.taskContext.getOwningNepheleTask(),
+					this.taskContext.getContainingTask(),
 					driverMemFraction,
 					true,
 					true,
@@ -126,7 +126,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 						memoryManager,
 						ioManager,
 						numPages,
-						super.taskContext.getOwningNepheleTask()
+						super.taskContext.getContainingTask()
 				);
 			case FULL_OUTER_HYBRIDHASH_BUILD_FIRST:
 				return new NonReusingBuildFirstHashJoinIterator<>(in1, in2,
@@ -134,7 +134,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 					serializer2, comparator2,
 					pairComparatorFactory.createComparator21(comparator1, comparator2),
 					memoryManager, ioManager,
-					this.taskContext.getOwningNepheleTask(),
+					this.taskContext.getContainingTask(),
 					driverMemFraction,
 					true,
 					true,
@@ -145,7 +145,7 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 					serializer2, comparator2,
 					pairComparatorFactory.createComparator12(comparator1, comparator2),
 					memoryManager, ioManager,
-					this.taskContext.getOwningNepheleTask(),
+					this.taskContext.getContainingTask(),
 					driverMemFraction,
 					true,
 					true,

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 320e006..8edcee2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -129,7 +129,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements Driver<GroupCombineFun
 
 		MemoryManager memManager = this.taskContext.getMemoryManager();
 		final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
-		this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);
+		this.memory = memManager.allocatePages(this.taskContext.getContainingTask(), numMemoryPages);
 
 		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
 		if (sortingComparator.supportsSerializationWithKeyNormalization() &&

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index efb59a7..8543723 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -135,7 +135,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1,
IT
 							serializer1, comparator1,
 							serializer2, comparator2,
 							pairComparatorFactory.createComparator12(comparator1, comparator2),
-							memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+							memoryManager, ioManager, numPages, this.taskContext.getContainingTask());
 					break;
 				case HYBRIDHASH_BUILD_FIRST:
 					this.joinIterator = new ReusingBuildFirstHashJoinIterator<>(in1, in2,
@@ -143,7 +143,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1,
IT
 							serializer2, comparator2,
 							pairComparatorFactory.createComparator21(comparator1, comparator2),
 							memoryManager, ioManager,
-							this.taskContext.getOwningNepheleTask(),
+							this.taskContext.getContainingTask(),
 							fractionAvailableMemory,
 							false,
 							false,
@@ -155,7 +155,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1,
IT
 							serializer2, comparator2,
 							pairComparatorFactory.createComparator12(comparator1, comparator2),
 							memoryManager, ioManager,
-							this.taskContext.getOwningNepheleTask(),
+							this.taskContext.getContainingTask(),
 							fractionAvailableMemory,
 							false,
 							false,
@@ -171,7 +171,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1,
IT
 							serializer1, comparator1,
 							serializer2, comparator2,
 							pairComparatorFactory.createComparator12(comparator1, comparator2),
-							memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+							memoryManager, ioManager, numPages, this.taskContext.getContainingTask());
 
 					break;
 				case HYBRIDHASH_BUILD_FIRST:
@@ -180,7 +180,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1,
IT
 							serializer2, comparator2,
 							pairComparatorFactory.createComparator21(comparator1, comparator2),
 							memoryManager, ioManager,
-							this.taskContext.getOwningNepheleTask(),
+							this.taskContext.getContainingTask(),
 							fractionAvailableMemory,
 							false,
 							false,
@@ -192,7 +192,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1,
IT
 							serializer2, comparator2,
 							pairComparatorFactory.createComparator12(comparator1, comparator2),
 							memoryManager, ioManager,
-							this.taskContext.getOwningNepheleTask(),
+							this.taskContext.getContainingTask(),
 							fractionAvailableMemory,
 							false,
 							false,

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
index 41bb54d..fbd7f35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
@@ -68,7 +68,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 						memoryManager,
 						ioManager,
 						numPages,
-						super.taskContext.getOwningNepheleTask()
+						super.taskContext.getContainingTask()
 				);
 			case LEFT_HYBRIDHASH_BUILD_FIRST:
 				return new ReusingBuildFirstHashJoinIterator<>(in1, in2,
@@ -76,7 +76,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 						serializer2, comparator2,
 						pairComparatorFactory.createComparator21(comparator1, comparator2),
 						memoryManager, ioManager,
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						driverMemFraction,
 						false,
 						true,
@@ -87,7 +87,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 						serializer2, comparator2,
 						pairComparatorFactory.createComparator12(comparator1, comparator2),
 						memoryManager, ioManager,
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						driverMemFraction,
 						true,
 						false,
@@ -126,7 +126,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 						memoryManager,
 						ioManager,
 						numPages,
-						super.taskContext.getOwningNepheleTask()
+						super.taskContext.getContainingTask()
 				);
 			case LEFT_HYBRIDHASH_BUILD_FIRST:
 				return new NonReusingBuildFirstHashJoinIterator<>(in1, in2,
@@ -134,7 +134,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 						serializer2, comparator2,
 						pairComparatorFactory.createComparator21(comparator1, comparator2),
 						memoryManager, ioManager,
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						driverMemFraction,
 						false,
 						true,
@@ -145,7 +145,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
 						serializer2, comparator2,
 						pairComparatorFactory.createComparator12(comparator1, comparator2),
 						memoryManager, ioManager,
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						driverMemFraction,
 						true,
 						false,

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 6c4ded1..aea7ae8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -125,7 +125,7 @@ public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>,
T> {
 		MemoryManager memManager = taskContext.getMemoryManager();
 		final int numMemoryPages = memManager.computeNumberOfPages(
 			taskContext.getTaskConfig().getRelativeMemoryDriver());
-		memory = memManager.allocatePages(taskContext.getOwningNepheleTask(), numMemoryPages);
+		memory = memManager.allocatePages(taskContext.getContainingTask(), numMemoryPages);
 
 		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
 		objectReuseEnabled = executionConfig.isObjectReuseEnabled();

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
index 96f65b4..d684aba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
@@ -68,7 +68,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
 						memoryManager,
 						ioManager,
 						numPages,
-						super.taskContext.getOwningNepheleTask()
+						super.taskContext.getContainingTask()
 				);
 			case RIGHT_HYBRIDHASH_BUILD_FIRST:
 				return new ReusingBuildFirstHashJoinIterator<>(in1, in2,
@@ -76,7 +76,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
 						serializer2, comparator2,
 						pairComparatorFactory.createComparator21(comparator1, comparator2),
 						memoryManager, ioManager,
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						driverMemFraction,
 						true,
 						false,
@@ -87,7 +87,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
 						serializer2, comparator2,
 						pairComparatorFactory.createComparator12(comparator1, comparator2),
 						memoryManager, ioManager,
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						driverMemFraction,
 						false,
 						true,
@@ -126,7 +126,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
 						memoryManager,
 						ioManager,
 						numPages,
-						super.taskContext.getOwningNepheleTask()
+						super.taskContext.getContainingTask()
 				);
 			case RIGHT_HYBRIDHASH_BUILD_FIRST:
 				return new NonReusingBuildFirstHashJoinIterator<>(in1, in2,
@@ -134,7 +134,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
 						serializer2, comparator2,
 						pairComparatorFactory.createComparator21(comparator1, comparator2),
 						memoryManager, ioManager,
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						driverMemFraction,
 						true,
 						false,
@@ -145,7 +145,7 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
 						serializer2, comparator2,
 						pairComparatorFactory.createComparator12(comparator1, comparator2),
 						memoryManager, ioManager,
-						this.taskContext.getOwningNepheleTask(),
+						this.taskContext.getContainingTask(),
 						driverMemFraction,
 						false,
 						true,

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
index df22528..bc3e4c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
@@ -65,7 +65,7 @@ public interface TaskContext<S, OT> {
 
 	Collector<OT> getOutputCollector();
 	
-	AbstractInvokable getOwningNepheleTask();
+	AbstractInvokable getContainingTask();
 	
 	String formatLogString(String message);
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index 2184302..babe69e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -134,7 +134,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		try {
 			sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),

-				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
+				getContainingTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
 					this.perSortFractionMem,
 					2, 0.8f, true /* use large record handler */, true);
 			addInput(sorter.getIterator());
@@ -180,7 +180,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		try {
 			sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),

-				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
+				getContainingTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
 					this.perSortFractionMem,
 					2, 0.8f, true /* use large record handler */, false);
 			addInput(sorter.getIterator());

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 7c92dba..718b446 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -129,7 +129,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		try {
 			sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),

-				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem,
+				getContainingTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem,
 					4, 0.8f, true /* use large record handler */, true);
 			addInput(sorter.getIterator());
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 89cde95..62110a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -216,7 +216,7 @@ public class TestTaskContext<S, T> implements TaskContext<S,
T> {
 	}
 
 	@Override
-	public AbstractInvokable getOwningNepheleTask() {
+	public AbstractInvokable getContainingTask() {
 		return this.owner;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 7531b99..75f960e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -361,7 +361,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT>
extends TestLog
 	}
 	
 	@Override
-	public AbstractInvokable getOwningNepheleTask() {
+	public AbstractInvokable getContainingTask() {
 		return this.owner;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index e9a0ba5..088435a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -358,7 +358,7 @@ public class DriverTestBase<S extends Function> extends TestLogger
implements Ta
 	}
 
 	@Override
-	public AbstractInvokable getOwningNepheleTask() {
+	public AbstractInvokable getContainingTask() {
 		return this.owner;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/273f54ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index ff12e76..a94e694 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -353,7 +353,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT>
extends TestLogg
 	}
 
 	@Override
-	public AbstractInvokable getOwningNepheleTask() {
+	public AbstractInvokable getContainingTask() {
 		return this.owner;
 	}
 


Mime
View raw message