flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject flink git commit: [FLINK-6394] [runtime] Respect object reuse configuration when executing group combining function
Date Mon, 08 May 2017 12:23:07 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 52b6e2fda -> a820f662d


[FLINK-6394] [runtime] Respect object reuse configuration when executing group combining function

This closes #3803.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a820f662
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a820f662
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a820f662

Branch: refs/heads/release-1.2
Commit: a820f662d20d35915f21ab6bf0be4004e79b8c6b
Parents: 52b6e2f
Author: Kurt Young <ykt836@gmail.com>
Authored: Sun Apr 30 16:56:00 2017 +0800
Committer: Kurt Young <kurt@apache.org>
Committed: Mon May 8 20:22:36 2017 +0800

----------------------------------------------------------------------
 .../sort/CombiningUnilateralSortMerger.java     | 48 +++++++++----
 .../util/NonReusingKeyGroupedIterator.java      |  1 +
 .../CombiningUnilateralSortMergerITCase.java    | 71 +++++++++++++++++++-
 3 files changed, 107 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a820f662/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index a02ced2..5500f37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
@@ -162,7 +163,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
{
 		List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles)
 	{
 		return new CombiningSpillingThread(exceptionHandler, queues, parentTask,
-			memoryManager, ioManager, serializerFactory.getSerializer(), comparator, sortReadMemory,
writeMemory, maxFileHandles);
+				memoryManager, ioManager, serializerFactory.getSerializer(),
+				comparator, sortReadMemory, writeMemory, maxFileHandles, objectReuseEnabled);
 	}
 
 	// ------------------------------------------------------------------------
@@ -172,16 +174,20 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
{
 	protected class CombiningSpillingThread extends SpillingThread {
 		
 		private final TypeComparator<E> comparator2;
-		
+
+		private final boolean objectReuseEnabled;
+
 		public CombiningSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E>
queues,
 				AbstractInvokable parentTask, MemoryManager memManager, IOManager ioManager, 
 				TypeSerializer<E> serializer, TypeComparator<E> comparator, 
-				List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int
maxNumFileHandles)
+				List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int
maxNumFileHandles,
+				boolean objectReuseEnabled)
 		{
 			super(exceptionHandler, queues, parentTask, memManager, ioManager, serializer, comparator,

 				sortReadMemory, writeMemory, maxNumFileHandles);
 			
 			this.comparator2 = comparator.duplicate();
+			this.objectReuseEnabled = objectReuseEnabled;
 		}
 
 		/**
@@ -315,7 +321,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
{
 
 				// set up the combining helpers
 				final InMemorySorter<E> buffer = element.buffer;
-				final CombineValueIterator<E> iter = new CombineValueIterator<E>(buffer,
this.serializer.createInstance());
+				final CombineValueIterator<E> iter = new CombineValueIterator<E>(
+						buffer, this.serializer.createInstance(), this.objectReuseEnabled);
 				final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer);
 
 				int i = 0;
@@ -454,7 +461,6 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
{
 
 			// the list with the target iterators
 			final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers,
channelAccesses, null);
-			final ReusingKeyGroupedIterator<E> groupedIter = new ReusingKeyGroupedIterator<E>(mergeIterator,
this.serializer, this.comparator2);
 
 			// create a new channel writer
 			final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
@@ -469,8 +475,18 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
{
 
 			// combine and write to disk
 			try {
-				while (groupedIter.nextKey()) {
-					combineStub.combine(groupedIter.getValues(), collector);
+				if (objectReuseEnabled) {
+					final ReusingKeyGroupedIterator<E> groupedIter = new ReusingKeyGroupedIterator<>(
+							mergeIterator, this.serializer, this.comparator2);
+					while (groupedIter.nextKey()) {
+						combineStub.combine(groupedIter.getValues(), collector);
+					}
+				} else {
+					final NonReusingKeyGroupedIterator<E> groupedIter = new NonReusingKeyGroupedIterator<>(
+							mergeIterator, this.comparator2);
+					while (groupedIter.nextKey()) {
+						combineStub.combine(groupedIter.getValues(), collector);
+					}
 				}
 			}
 			catch (Exception e) {
@@ -505,7 +521,9 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
{
 		
 		private final InMemorySorter<E> buffer; // the buffer from which values are returned
 		
-		private E record;
+		private E recordReuse;
+
+		private final boolean objectReuseEnabled;
 
 		private int last; // the position of the last value to be returned
 
@@ -519,9 +537,10 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
{
 		 * @param buffer
 		 *        The buffer to get the values from.
 		 */
-		public CombineValueIterator(InMemorySorter<E> buffer, E instance) {
+		public CombineValueIterator(InMemorySorter<E> buffer, E instance, boolean objectReuseEnabled)
{
 			this.buffer = buffer;
-			this.record = instance;
+			this.recordReuse = instance;
+			this.objectReuseEnabled = objectReuseEnabled;
 		}
 
 		/**
@@ -547,9 +566,14 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
{
 		public E next() {
 			if (this.position <= this.last) {
 				try {
-					this.record = this.buffer.getRecord(this.record, this.position);
+					E record;
+					if (objectReuseEnabled) {
+						record = this.buffer.getRecord(this.recordReuse, this.position);
+					} else {
+						record = this.buffer.getRecord(this.position);
+					}
 					this.position++;
-					return this.record;
+					return record;
 				}
 				catch (IOException ioex) {
 					LOG.error("Error retrieving a value from a buffer.", ioex);

http://git-wip-us.apache.org/repos/asf/flink/blob/a820f662/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
index 6f4448c..221cf24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
@@ -164,6 +164,7 @@ public final class NonReusingKeyGroupedIterator<E> implements KeyGroupedIterator
 	 * 
 	 * @return Iterator over all values that belong to the current key.
 	 */
+	@Override
 	public ValuesIterator getValues() {
 		return this.valuesIterator;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a820f662/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 0f636ef..2c875f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -183,6 +184,42 @@ public class CombiningUnilateralSortMergerITCase {
 	}
 
 	@Test
+	public void testCombineSpillingDisableObjectReuse() throws Exception {
+		int noKeys = 100;
+		int noKeyCnt = 10000;
+
+		TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader();
+
+		LOG.debug("initializing sortmerger");
+
+		MaterializedCountCombiner comb = new MaterializedCountCombiner();
+
+		// set maxNumFileHandles = 2 to trigger multiple channel merging
+		Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb,
+				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2,
this.comparator2,
+				0.01, 2, 0.005f, true /* use large record handler */, false);
+
+		final Tuple2<Integer, Integer> rec = new Tuple2<>();
+
+		for (int i = 0; i < noKeyCnt; i++) {
+			rec.setField(i, 0);
+			for (int j = 0; j < noKeys; j++) {
+				rec.setField(j, 1);
+				reader.emit(rec);
+			}
+		}
+		reader.close();
+
+		MutableObjectIterator<Tuple2<Integer, Integer>> iterator = merger.getIterator();
+		Iterator<Integer> result = getReducingIterator(iterator, serializerFactory2.getSerializer(),
comparator2.duplicate());
+		while (result.hasNext()) {
+			Assert.assertEquals(4950, result.next().intValue());
+		}
+
+		merger.close();
+	}
+
+	@Test
 	public void testSortAndValidate() throws Exception
 	{
 		final Hashtable<Integer, Integer> countTable = new Hashtable<>(KEY_MAX);
@@ -331,7 +368,39 @@ public class CombiningUnilateralSortMergerITCase {
 			closed = true;
 		}
 	}
-	
+
+	// --------------------------------------------------------------------------------------------
+
+	public static class MaterializedCountCombiner
+			extends RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer,
Integer>>
+			implements GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer,
Integer>>
+	{
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void combine(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer,
Integer>> out) {
+			ArrayList<Tuple2<Integer, Integer>> valueList = new ArrayList<>();
+			for (Tuple2<Integer, Integer> next : values) {
+				valueList.add(next);
+			}
+
+			int count = 0;
+			Tuple2<Integer, Integer> rec = new Tuple2<>();
+			for (Tuple2<Integer, Integer> tuple : valueList) {
+				rec.setField(tuple.f0, 0);
+				count += tuple.f1;
+			}
+			rec.setField(count, 1);
+			out.collect(rec);
+		}
+
+		@Override
+		public void reduce(Iterable<Tuple2<Integer, Integer>> values,
+				Collector<Tuple2<Integer, Integer>> out) throws Exception
+		{
+		}
+	}
+
 	private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Tuple2<Integer,
Integer>> data, TypeSerializer<Tuple2<Integer, Integer>> serializer, TypeComparator<Tuple2<Integer,
Integer>>  comparator) {
 		
 		final ReusingKeyGroupedIterator<Tuple2<Integer, Integer>>  groupIter = new
ReusingKeyGroupedIterator<> (data, serializer, comparator);


Mime
View raw message