flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [8/9] flink git commit: [FLINK-1296] [runtime] Fix bug when large record handling results in empty spill files
Date Wed, 21 Jan 2015 11:08:58 GMT
[FLINK-1296] [runtime] Fix bug when large record handling results in empty spill files


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5970e212
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5970e212
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5970e212

Branch: refs/heads/master
Commit: 5970e212b1beca29590dd138dc5e8eaf90bac498
Parents: 76eaef0
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Dec 18 20:08:09 2014 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Jan 21 12:01:36 2015 +0100

----------------------------------------------------------------------
 .../operators/sort/UnilateralSortMerger.java    |  4 +-
 .../sort/ExternalSortLargeRecordsITCase.java    | 66 ++++++++++++++++++++
 2 files changed, 69 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5970e212/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 6e89300..cdd5eb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -1320,7 +1320,9 @@ public class UnilateralSortMerger<E> implements Sorter<E>
{
 				output.close();
 				unregisterOpenChannelToBeRemovedAtShudown(writer);
 				
-				channelIDs.add(new ChannelWithBlockCount(channel, output.getBlockCount()));
+				if (output.getBytesWritten() > 0) {
+					channelIDs.add(new ChannelWithBlockCount(channel, output.getBlockCount()));
+				}
 
 				// pass empty sort-buffer to reading thread
 				element.buffer.reset();

http://git-wip-us.apache.org/repos/asf/flink/blob/5970e212/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 33d15ae..ad15282 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -293,6 +293,72 @@ public class ExternalSortLargeRecordsITCase {
 		}
 	}
 	
+	@Test
+	public void testSortWithMediumRecordsOnly() {
+		try {
+			final int NUM_RECORDS = 70;
+			
+			final TypeInformation<?>[] types = new TypeInformation<?>[] {
+					BasicTypeInfo.LONG_TYPE_INFO,
+					new ValueTypeInfo<SmallOrMediumOrLargeValue>(SmallOrMediumOrLargeValue.class)
+				};
+			
+			final TupleTypeInfo<Tuple2<Long, SmallOrMediumOrLargeValue>> typeInfo = 
+								new TupleTypeInfo<Tuple2<Long,SmallOrMediumOrLargeValue>>(types);
+			
+			final TypeSerializer<Tuple2<Long, SmallOrMediumOrLargeValue>> serializer =
typeInfo.createSerializer();
+			final TypeComparator<Tuple2<Long, SmallOrMediumOrLargeValue>> comparator =
typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
+			
+			MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = 
+					new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>()
+			{
+				private final Random rnd = new Random();
+				private int num = -1;
+				
+				@Override
+				public Tuple2<Long, SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue>
reuse) {
+					if (++num < NUM_RECORDS) {
+						long val = rnd.nextLong();
+						return new Tuple2<Long, SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int)
val, SmallOrMediumOrLargeValue.MEDIUM_SIZE));
+					}
+					else {
+						return null;
+					}
+					
+				}
+			};
+			
+			@SuppressWarnings("unchecked")
+			Sorter<Tuple2<Long, SmallOrMediumOrLargeValue>> sorter = new UnilateralSortMerger<Tuple2<Long,
SmallOrMediumOrLargeValue>>(
+					this.memoryManager, this.ioManager, 
+					source, this.parentTask,
+					new RuntimeStatefulSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer,
(Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
+					comparator, 1.0, 1, 128, 0.7f);
+			
+			// check order
+			MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator =
sorter.getIterator();
+			
+			Tuple2<Long, SmallOrMediumOrLargeValue> val = serializer.createInstance();
+			
+			long prevKey = Long.MAX_VALUE;
+
+			for (int i = 0; i < NUM_RECORDS; i++) {
+				val = iterator.next(val);
+				
+				assertTrue(val.f0 <= prevKey);
+				assertTrue(val.f0.intValue() == val.f1.val());
+			}
+			
+			assertNull(iterator.next(val));
+			
+			sorter.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	public static final class SomeMaybeLongValue implements org.apache.flink.types.Value {


Mime
View raw message