flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/9] flink git commit: [FLINK-1296] [runtime] Limit memory consumption on merged by spilling larger records into a single special merge stream
Date Wed, 21 Jan 2015 11:08:51 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7610588bc -> 7df6a3d72


[FLINK-1296] [runtime] Limit memory consumption on merged by spilling larger records into a single special merge stream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28a62d84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28a62d84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28a62d84

Branch: refs/heads/master
Commit: 28a62d844e7a069a05383bb1e9e71f9daea41199
Parents: be8e1f1
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Dec 18 11:42:57 2014 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Jan 21 12:01:35 2015 +0100

----------------------------------------------------------------------
 .../operators/hash/CompactingHashTable.java     |   4 +-
 .../operators/sort/FixedLengthRecordSorter.java |  22 +-
 .../runtime/operators/sort/InMemorySorter.java  |  14 +-
 .../operators/sort/NormalizedKeySorter.java     |  98 ++--
 .../runtime/operators/sort/SortBuffer.java      | 510 -------------------
 .../operators/sort/UnilateralSortMerger.java    | 177 +------
 .../apache/flink/runtime/util/IntArrayList.java |  28 +-
 .../operators/sort/ExternalSortITCase.java      | 240 +--------
 .../sort/ExternalSortLargeRecordsITCase.java    | 455 +++++++++++++++++
 .../sort/MassiveStringSortingITCase.java        |   4 -
 10 files changed, 570 insertions(+), 982 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 301aa82..7107972 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -1032,7 +1032,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 						int oldBucketCount = 0;
 						int newBucketCount = 0;
 						while(!hashList.isEmpty()) {
-							hash = hashList.removeInt(hashList.size()-1);
+							hash = hashList.removeLast();
 							pointer = pointerList.removeLong(pointerList.size()-1);
 							posHashCode = hash % this.numBuckets;
 							if(posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) {
@@ -1061,7 +1061,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				int bucketInSegmentPos = 0;
 				MemorySegment bucket = null;
 				while(!overflowHashes.isEmpty()) {
-					hash = overflowHashes.removeInt(overflowHashes.size()-1);
+					hash = overflowHashes.removeLast();
 					pointer = overflowPointers.removeLong(overflowPointers.size()-1);
 					posHashCode = hash % this.numBuckets; 
 					bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits;

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index a3766e7..cd982c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -186,25 +186,20 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
 		return this.freeMemory;
 	}
 	
-	/**
-	 * Gets the total capacity of this sorter, in bytes.
-	 * 
-	 * @return The sorter's total capacity.
-	 */
 	@Override
 	public long getCapacity() {
 		return ((long) this.totalNumBuffers) * this.segmentSize;
 	}
 	
-	/**
-	 * Gets the number of bytes currently occupied in this sorter.
-	 * 
-	 * @return The number of bytes occupied.
-	 */
 	@Override
 	public long getOccupancy() {
 		return this.sortBufferBytes;
 	}
+	
+	@Override
+	public long getNumRecordBytes() {
+		return this.sortBufferBytes;
+	}
 
 	// -------------------------------------------------------------------------
 	// Retrieving and Writing
@@ -430,6 +425,13 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
 		}
 	}
 	
+	@Override
+	public void writeToOutput(ChannelWriterOutputView output, LargeRecordHandler<T> largeRecordsOutput)
+			throws IOException
+	{
+		writeToOutput(output);
+	}
+	
 	/**
 	 * Writes a subset of the records in this buffer in their logical order to the given output.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
index 7dc4f97..633ec70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
@@ -25,7 +25,6 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
  *
  */
@@ -58,13 +57,20 @@ public interface InMemorySorter<T> extends IndexedSortable {
 	long getCapacity();
 	
 	/**
-	 * Gets the number of bytes currently occupied in this sorter.
+	 * Gets the number of bytes currently occupied in this sorter, records and sort index.
 	 * 
 	 * @return The number of bytes occupied.
 	 */
 	long getOccupancy();
 	
 	/**
+	 * Gets the number of bytes occupied by records only.
+	 * 
+	 * @return The number of bytes occupied by records.
+	 */
+	long getNumRecordBytes();
+	
+	/**
 	 * Gets the record at the given logical position.
 	 * 
 	 * @param reuse The reuse object to deserialize the record into.
@@ -96,7 +102,9 @@ public interface InMemorySorter<T> extends IndexedSortable {
 	 * @param output The output view to write the records to.
 	 * @throws IOException Thrown, if an I/O exception occurred writing to the output view.
 	 */
-	public void writeToOutput(final ChannelWriterOutputView output) throws IOException;
+	public void writeToOutput(ChannelWriterOutputView output) throws IOException;
+	
+	public void writeToOutput(ChannelWriterOutputView output, LargeRecordHandler<T> largeRecordsOutput) throws IOException;
 	
 	/**
 	 * Writes a subset of the records in this buffer in their logical order to the given output.

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index c382708..97b9236 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -45,6 +45,8 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 	private static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8;
 	
 	private static final int MIN_REQUIRED_BUFFERS = 3;
+	
+	private static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024;
 
 	// ------------------------------------------------------------------------
 	//                               Members
@@ -227,25 +229,20 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 		return this.freeMemory;
 	}
 	
-	/**
-	 * Gets the total capacity of this sorter, in bytes.
-	 * 
-	 * @return The sorter's total capacity.
-	 */
 	@Override
 	public long getCapacity() {
 		return ((long) this.totalNumBuffers) * this.segmentSize;
 	}
 	
-	/**
-	 * Gets the number of bytes currently occupied in this sorter.
-	 * 
-	 * @return The number of bytes occupied.
-	 */
 	@Override
 	public long getOccupancy() {
 		return this.currentDataBufferOffset + this.sortIndexBytes;
 	}
+	
+	@Override
+	public long getNumRecordBytes() {
+		return this.currentDataBufferOffset;
+	}
 
 	// -------------------------------------------------------------------------
 	// Retrieving and Writing
@@ -285,22 +282,27 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 			}
 		}
 		
-		// add the pointer and the normalized key
-		this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, this.currentDataBufferOffset);
-		if(this.numKeyBytes != 0) {
-			this.comparator.putNormalizedKey(record, this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, this.numKeyBytes);
-		}
-		
 		// serialize the record into the data buffers
 		try {
 			this.serializer.serialize(record, this.recordCollector);
-			this.currentSortIndexOffset += this.indexEntrySize;
-			this.currentDataBufferOffset = this.recordCollector.getCurrentOffset();
-			this.numRecords++;
-			return true;
-		} catch (EOFException eofex) {
+		}
+		catch (EOFException e) {
 			return false;
 		}
+		
+		final long newOffset = this.recordCollector.getCurrentOffset();
+		final boolean shortRecord = newOffset - this.currentDataBufferOffset < LARGE_RECORD_THRESHOLD;
+		
+		// add the pointer and the normalized key
+		this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, shortRecord ? this.currentDataBufferOffset : -this.currentDataBufferOffset);
+		if (this.numKeyBytes != 0) {
+			this.comparator.putNormalizedKey(record, this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, this.numKeyBytes);
+		}
+		
+		this.currentSortIndexOffset += this.indexEntrySize;
+		this.currentDataBufferOffset = newOffset;
+		this.numRecords++;
+		return true;
 	}
 	
 	// ------------------------------------------------------------------------
@@ -315,7 +317,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 		final int bufferNum = logicalPosition / this.indexEntriesPerSegment;
 		final int segmentOffset = logicalPosition % this.indexEntriesPerSegment;
 		
-		return this.sortIndex.get(bufferNum).getLong(segmentOffset * this.indexEntrySize);
+		return Math.abs(this.sortIndex.get(bufferNum).getLong(segmentOffset * this.indexEntrySize));
 	}
 	
 	private final T getRecordFromBuffer(T reuse, long pointer) throws IOException {
@@ -368,8 +370,8 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 			return this.useNormKeyUninverted ? val : -val;
 		}
 		
-		final long pointerI = segI.getLong(segmentOffsetI);
-		final long pointerJ = segJ.getLong(segmentOffsetJ);
+		final long pointerI = Math.abs(segI.getLong(segmentOffsetI));
+		final long pointerJ = Math.abs(segJ.getLong(segmentOffsetJ));
 		
 		return compareRecords(pointerI, pointerJ);
 	}
@@ -422,7 +424,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 						this.currentIndexSegment = sortIndex.get(++this.currentSegment);
 					}
 					
-					long pointer = this.currentIndexSegment.getLong(this.currentOffset);
+					long pointer = Math.abs(this.currentIndexSegment.getLong(this.currentOffset));
 					this.currentOffset += indexEntrySize;
 					
 					try {
@@ -475,30 +477,34 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 	 * @throws IOException Thrown, if an I/O exception occurred writing to the output view.
 	 */
 	@Override
-	public void writeToOutput(final ChannelWriterOutputView output) throws IOException {
-		int recordsLeft = this.numRecords;
+	public void writeToOutput(ChannelWriterOutputView output) throws IOException {
+		writeToOutput(output, null);
+	}
+	
+	@Override
+	public void writeToOutput(ChannelWriterOutputView output, LargeRecordHandler<T> largeRecordsOutput)
+			throws IOException
+	{
+		final int numRecords = this.numRecords;
 		int currentMemSeg = 0;
-		while (recordsLeft > 0)
-		{
+		int currentRecord = 0;
+		
+		while (currentRecord < numRecords) {
 			final MemorySegment currentIndexSegment = this.sortIndex.get(currentMemSeg++);
-			int offset = 0;
-			// check whether we have a full or partially full segment
-			if (recordsLeft >= this.indexEntriesPerSegment) {
-				// full segment
-				for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) {
-					final long pointer = currentIndexSegment.getLong(offset);
+
+			// go through all records in the memory segment
+			for (int offset = 0; currentRecord < numRecords && offset <= this.lastIndexEntryOffset; currentRecord++, offset += this.indexEntrySize) {
+				final long pointer = currentIndexSegment.getLong(offset);
+				
+				// small records go into the regular spill file, large records into the special code path
+				if (pointer >= 0) {
 					this.recordBuffer.setReadPosition(pointer);
 					this.serializer.copy(this.recordBuffer, output);
-					
 				}
-				recordsLeft -= this.indexEntriesPerSegment;
-			} else {
-				// partially filled segment
-				for (; recordsLeft > 0; recordsLeft--, offset += this.indexEntrySize)
-				{
-					final long pointer = currentIndexSegment.getLong(offset);
-					this.recordBuffer.setReadPosition(pointer);
-					this.serializer.copy(this.recordBuffer, output);
+				else {
+					this.recordBuffer.setReadPosition(-pointer);
+					T record = this.serializer.deserialize(this.recordBuffer);
+					largeRecordsOutput.addRecord(record);
 				}
 			}
 		}
@@ -524,7 +530,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 			if (num >= this.indexEntriesPerSegment && offset == 0) {
 				// full segment
 				for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) {
-					final long pointer = currentIndexSegment.getLong(offset);
+					final long pointer = Math.abs(currentIndexSegment.getLong(offset));
 					this.recordBuffer.setReadPosition(pointer);
 					this.serializer.copy(this.recordBuffer, output);
 				}
@@ -533,7 +539,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 				// partially filled segment
 				for (; num > 0 && offset <= this.lastIndexEntryOffset; num--, offset += this.indexEntrySize)
 				{
-					final long pointer = currentIndexSegment.getLong(offset);
+					final long pointer = Math.abs(currentIndexSegment.getLong(offset));
 					this.recordBuffer.setReadPosition(pointer);
 					this.serializer.copy(this.recordBuffer, output);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
deleted file mode 100644
index be8511b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
-import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
-import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
-import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class SortBuffer<T> implements InMemorySorter<T> {
-
-	private static final int OFFSET_LEN = 8;
-	
-	private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 16;
-	
-	private static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8;
-	
-	private static final int MIN_REQUIRED_BUFFERS = 3;
-
-	// ------------------------------------------------------------------------
-	//                               Members
-	// ------------------------------------------------------------------------
-
-	private final byte[] swapBuffer;
-	
-	private final TypeSerializer<T> serializer;
-	
-	private final TypeComparator<T> comparator;
-	
-	private final SimpleCollectingOutputView recordCollector;
-	
-	private final RandomAccessInputView recordBuffer;
-	
-	private final RandomAccessInputView recordBufferForComparison;
-	
-	private MemorySegment currentSortIndexSegment;
-	
-	private final ArrayList<MemorySegment> freeMemory;
-	
-	private final ArrayList<MemorySegment> sortIndex;
-	
-	private final ArrayList<MemorySegment> recordBufferSegments;
-	
-	private long currentDataBufferOffset;
-	
-	private long sortIndexBytes;
-	
-	private int currentSortIndexOffset;
-	
-	private int numRecords;
-	
-	private final int numKeyBytes;
-	
-	private final int indexEntrySize;
-	
-	private final int indexEntriesPerSegment;
-	
-	private final int lastIndexEntryOffset;
-	
-	private final int segmentSize;
-	
-	private final int totalNumBuffers;
-	
-	private final boolean normalizedKeyFullyDetermines;
-	
-	private final boolean useNormKeyUninverted;
-	
-	
-	// -------------------------------------------------------------------------
-	// Constructors / Destructors
-	// -------------------------------------------------------------------------
-
-	public SortBuffer(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory) {
-		this(serializer, comparator, memory, DEFAULT_MAX_NORMALIZED_KEY_LEN);
-	}
-	
-	public SortBuffer(TypeSerializer<T> serializer, TypeComparator<T> comparator, 
-			List<MemorySegment> memory, int maxNormalizedKeyBytes)
-	{
-		if (serializer == null || comparator == null || memory == null) {
-			throw new NullPointerException();
-		}
-		if (maxNormalizedKeyBytes < 0) {
-			throw new IllegalArgumentException("Maximal number of normalized key bytes must not be negative.");
-		}
-		
-		this.serializer = serializer;
-		this.comparator = comparator;
-		this.useNormKeyUninverted = !comparator.invertNormalizedKey();
-		
-		// check the size of the first buffer and record it. all further buffers must have the same size.
-		// the size must also be a power of 2
-		this.totalNumBuffers = memory.size();
-		if (this.totalNumBuffers < MIN_REQUIRED_BUFFERS) {
-			throw new IllegalArgumentException("Normalized-Key sorter requires at least " + MIN_REQUIRED_BUFFERS + " memory buffers.");
-		}
-		this.segmentSize = memory.get(0).size();
-		
-		if (memory instanceof ArrayList<?>) {
-			this.freeMemory = (ArrayList<MemorySegment>) memory;
-		}
-		else {
-			this.freeMemory = new ArrayList<MemorySegment>(memory.size());
-			this.freeMemory.addAll(memory);
-		}
-		
-		// create the buffer collections
-		this.sortIndex = new ArrayList<MemorySegment>(16);
-		this.recordBufferSegments = new ArrayList<MemorySegment>(16);
-		
-		// the views for the record collections
-		this.recordCollector = new SimpleCollectingOutputView(this.recordBufferSegments,
-			new ListMemorySegmentSource(this.freeMemory), this.segmentSize);
-		this.recordBuffer = new RandomAccessInputView(this.recordBufferSegments, this.segmentSize);
-		this.recordBufferForComparison = new RandomAccessInputView(this.recordBufferSegments, this.segmentSize);
-		
-		// set up normalized key characteristics
-		if (this.comparator.supportsNormalizedKey()) {
-			// compute the max normalized key length
-			int numPartialKeys;
-			try {
-				numPartialKeys = this.comparator.getFlatComparators().length;
-			} catch (Throwable t) {
-				numPartialKeys = 1;
-			}
-			
-			int maxLen = Math.min(maxNormalizedKeyBytes, MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys);
-			
-			this.numKeyBytes = Math.min(this.comparator.getNormalizeKeyLen(), maxLen);
-			this.normalizedKeyFullyDetermines = !this.comparator.isNormalizedKeyPrefixOnly(this.numKeyBytes);
-		}
-		else {
-			this.numKeyBytes = 0;
-			this.normalizedKeyFullyDetermines = false;
-		}
-		
-		// compute the index entry size and limits
-		this.indexEntrySize = this.numKeyBytes + OFFSET_LEN;
-		this.indexEntriesPerSegment = segmentSize / this.indexEntrySize;
-		this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * this.indexEntrySize;
-		this.swapBuffer = new byte[this.indexEntrySize];
-		
-		// set to initial state
-		this.currentSortIndexSegment = nextMemorySegment();
-		this.sortIndex.add(this.currentSortIndexSegment);
-	}
-
-	// -------------------------------------------------------------------------
-	// Memory Segment
-	// -------------------------------------------------------------------------
-
-	/**
-	 * Resets the sort buffer back to the state where it is empty. All contained data is discarded.
-	 */
-	@Override
-	public void reset() {
-		// reset all offsets
-		this.numRecords = 0;
-		this.currentSortIndexOffset = 0;
-		this.currentDataBufferOffset = 0;
-		this.sortIndexBytes = 0;
-		
-		// return all memory
-		this.freeMemory.addAll(this.sortIndex);
-		this.freeMemory.addAll(this.recordBufferSegments);
-		this.sortIndex.clear();
-		this.recordBufferSegments.clear();
-		
-		// grab first buffers
-		this.currentSortIndexSegment = nextMemorySegment();
-		this.sortIndex.add(this.currentSortIndexSegment);
-		this.recordCollector.reset();
-	}
-
-	/**
-	 * Checks whether the buffer is empty.
-	 * 
-	 * @return True, if no record is contained, false otherwise.
-	 */
-	@Override
-	public boolean isEmpty() {
-		return this.numRecords == 0;
-	}
-	
-	/**
-	 * Collects all memory segments from this sorter.
-	 * 
-	 * @return All memory segments from this sorter.
-	 */
-	@Override
-	public List<MemorySegment> dispose() {
-		this.freeMemory.addAll(this.sortIndex);
-		this.freeMemory.addAll(this.recordBufferSegments);
-		
-		this.recordBufferSegments.clear();
-		this.sortIndex.clear();
-		
-		return this.freeMemory;
-	}
-	
-	/**
-	 * Gets the total capacity of this sorter, in bytes.
-	 * 
-	 * @return The sorter's total capacity.
-	 */
-	@Override
-	public long getCapacity() {
-		return ((long) this.totalNumBuffers) * this.segmentSize;
-	}
-	
-	/**
-	 * Gets the number of bytes currently occupied in this sorter.
-	 * 
-	 * @return The number of bytes occupied.
-	 */
-	@Override
-	public long getOccupancy() {
-		return this.currentDataBufferOffset + this.sortIndexBytes;
-	}
-
-	// -------------------------------------------------------------------------
-	// Retrieving and Writing
-	// -------------------------------------------------------------------------
-
-	/**
-	 * Gets the record at the given logical position.
-	 * 
-	 * @param reuse The target object to deserialize the record into.
-	 * @param logicalPosition The logical position of the record.
-	 * @throws IOException Thrown, if an exception occurred during deserialization.
-	 */
-	@Override
-	public T getRecord(T reuse, int logicalPosition) throws IOException {
-		return getRecordFromBuffer(reuse, readPointer(logicalPosition));
-	}
-
-	/**
-	 * Writes a given record to this sort buffer. The written record will be appended and take
-	 * the last logical position.
-	 * 
-	 * @param record The record to be written.
-	 * @return True, if the record was successfully written, false, if the sort buffer was full.
-	 * @throws IOException Thrown, if an error occurred while serializing the record into the buffers.
-	 */
-	@Override
-	public boolean write(T record) throws IOException {
-		//check whether we need a new memory segment for the sort index
-		if (this.currentSortIndexOffset > this.lastIndexEntryOffset) {
-			if (memoryAvailable()) {
-				this.currentSortIndexSegment = nextMemorySegment();
-				this.sortIndex.add(this.currentSortIndexSegment);
-				this.currentSortIndexOffset = 0;
-				this.sortIndexBytes += this.segmentSize;
-			} else {
-				return false;
-			}
-		}
-		
-		// add the pointer and the normalized key
-		this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, this.currentDataBufferOffset);
-		if(this.numKeyBytes != 0) {
-			this.comparator.putNormalizedKey(record, this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, this.numKeyBytes);
-		}
-		
-		// serialize the record into the data buffers
-		try {
-			this.serializer.serialize(record, this.recordCollector);
-			this.currentSortIndexOffset += this.indexEntrySize;
-			this.currentDataBufferOffset = this.recordCollector.getCurrentOffset();
-			this.numRecords++;
-			return true;
-		} catch (EOFException eofex) {
-			return false;
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//                           Access Utilities
-	// ------------------------------------------------------------------------
-	
-	private final long readPointer(int logicalPosition) {
-		if (logicalPosition < 0 | logicalPosition >= this.numRecords) {
-			throw new IndexOutOfBoundsException();
-		}
-		
-		final int bufferNum = logicalPosition / this.indexEntriesPerSegment;
-		final int segmentOffset = logicalPosition % this.indexEntriesPerSegment;
-		
-		return this.sortIndex.get(bufferNum).getLong(segmentOffset * this.indexEntrySize);
-	}
-	
-	private final T getRecordFromBuffer(T reuse, long pointer) throws IOException {
-		this.recordBuffer.setReadPosition(pointer);
-		return this.serializer.deserialize(reuse, this.recordBuffer);
-	}
-	
-	private final int compareRecords(long pointer1, long pointer2) {
-		this.recordBuffer.setReadPosition(pointer1);
-		this.recordBufferForComparison.setReadPosition(pointer2);
-		
-		try {
-			return this.comparator.compareSerialized(this.recordBuffer, this.recordBufferForComparison);
-		} catch (IOException ioex) {
-			throw new RuntimeException("Error comparing two records.", ioex);
-		}
-	}
-	
-	private final boolean memoryAvailable() {
-		return !this.freeMemory.isEmpty();
-	}
-	
-	private final MemorySegment nextMemorySegment() {
-		return this.freeMemory.remove(this.freeMemory.size() - 1);
-	}
-
-	// -------------------------------------------------------------------------
-	// Indexed Sorting
-	// -------------------------------------------------------------------------
-
-	@Override
-	public int compare(int i, int j) {
-		final int bufferNumI = i / this.indexEntriesPerSegment;
-		final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize;
-		
-		final int bufferNumJ = j / this.indexEntriesPerSegment;
-		final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize;
-		
-		final MemorySegment segI = this.sortIndex.get(bufferNumI);
-		final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
-		
-		int val = MemorySegment.compare(segI, segJ, segmentOffsetI + OFFSET_LEN, segmentOffsetJ + OFFSET_LEN, this.numKeyBytes);
-		
-		if (val != 0 || this.normalizedKeyFullyDetermines) {
-			return this.useNormKeyUninverted ? val : -val;
-		}
-		
-		final long pointerI = segI.getLong(segmentOffsetI);
-		final long pointerJ = segJ.getLong(segmentOffsetJ);
-		
-		return compareRecords(pointerI, pointerJ);
-	}
-
-	@Override
-	public void swap(int i, int j) {
-		final int bufferNumI = i / this.indexEntriesPerSegment;
-		final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize;
-		
-		final int bufferNumJ = j / this.indexEntriesPerSegment;
-		final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize;
-		
-		final MemorySegment segI = this.sortIndex.get(bufferNumI);
-		final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
-		
-		MemorySegment.swapBytes(segI, segJ, this.swapBuffer, segmentOffsetI, segmentOffsetJ, this.indexEntrySize);
-	}
-
-	@Override
-	public int size() {
-		return this.numRecords;
-	}
-
-	// -------------------------------------------------------------------------
-	
-	/**
-	 * Gets an iterator over all records in this buffer in their logical order.
-	 * 
-	 * @return An iterator returning the records in their logical order.
-	 */
-	@Override
-	public final MutableObjectIterator<T> getIterator() {
-		return new MutableObjectIterator<T>()
-		{
-			private final int size = size();
-			private int current = 0;
-			
-			private int currentSegment = 0;
-			private int currentOffset = 0;
-			
-			private MemorySegment currentIndexSegment = sortIndex.get(0);
-
-			@Override
-			public T next(T target)
-			{
-				if (this.current < this.size) {
-					this.current++;
-					if (this.currentOffset > lastIndexEntryOffset) {
-						this.currentOffset = 0;
-						this.currentIndexSegment = sortIndex.get(++this.currentSegment);
-					}
-					
-					long pointer = this.currentIndexSegment.getLong(this.currentOffset);
-					this.currentOffset += indexEntrySize;
-					
-					try {
-						return getRecordFromBuffer(target, pointer);
-					}
-					catch (IOException ioe) {
-						throw new RuntimeException(ioe);
-					}
-				}
-				else {
-					return null;
-				}
-			}
-		};
-	}
-	
-	// ------------------------------------------------------------------------
-	//                Writing to a DataOutputView
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Writes the records in this buffer in their logical order to the given output.
-	 * 
-	 * @param output The output view to write the records to.
-	 * @throws IOException Thrown, if an I/O exception occurred writing to the output view.
-	 */
-	@Override
-	public void writeToOutput(final ChannelWriterOutputView output) throws IOException {
-		int recordsLeft = this.numRecords;
-		int currentMemSeg = 0;
-		while (recordsLeft > 0)
-		{
-			final MemorySegment currentIndexSegment = this.sortIndex.get(currentMemSeg++);
-			int offset = 0;
-			// check whether we have a full or partially full segment
-			if (recordsLeft >= this.indexEntriesPerSegment) {
-				// full segment
-				for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) {
-					final long pointer = currentIndexSegment.getLong(offset);
-					this.recordBuffer.setReadPosition(pointer);
-					this.serializer.copy(this.recordBuffer, output);
-					
-				}
-				recordsLeft -= this.indexEntriesPerSegment;
-			} else {
-				// partially filled segment
-				for (; recordsLeft > 0; recordsLeft--, offset += this.indexEntrySize)
-				{
-					final long pointer = currentIndexSegment.getLong(offset);
-					this.recordBuffer.setReadPosition(pointer);
-					this.serializer.copy(this.recordBuffer, output);
-				}
-			}
-		}
-	}
-	
-	/**
-	 * Writes a subset of the records in this buffer in their logical order to the given output.
-	 * 
-	 * @param output The output view to write the records to.
-	 * @param start The logical start position of the subset.
-	 * @param num The number of elements to write.
-	 * @throws IOException Thrown, if an I/O exception occurred writing to the output view.
-	 */
-	@Override
-	public void writeToOutput(final ChannelWriterOutputView output, final int start, int num) throws IOException {
-		int currentMemSeg = start / this.indexEntriesPerSegment;
-		int offset = (start % this.indexEntriesPerSegment) * this.indexEntrySize;
-		
-		while (num > 0)
-		{
-			final MemorySegment currentIndexSegment = this.sortIndex.get(currentMemSeg++);
-			// check whether we have a full or partially full segment
-			if (num >= this.indexEntriesPerSegment && offset == 0) {
-				// full segment
-				for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) {
-					final long pointer = currentIndexSegment.getLong(offset);
-					this.recordBuffer.setReadPosition(pointer);
-					this.serializer.copy(this.recordBuffer, output);
-				}
-				num -= this.indexEntriesPerSegment;
-			} else {
-				// partially filled segment
-				for (; num > 0 && offset <= this.lastIndexEntryOffset; num--, offset += this.indexEntrySize)
-				{
-					final long pointer = currentIndexSegment.getLong(offset);
-					this.recordBuffer.setReadPosition(pointer);
-					this.serializer.copy(this.recordBuffer, output);
-				}
-			}
-			offset = 0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 51cc1cf..a534399 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
@@ -78,7 +77,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	
 	/** The minimum number of segments that are required for the sort to operate. */
 	protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
-
+	
 	// ------------------------------------------------------------------------
 	//                                  Threads
 	// ------------------------------------------------------------------------
@@ -670,12 +669,13 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	 * Class representing buffers that circulate between the reading, sorting and spilling thread.
 	 */
 	protected static final class CircularElement<E> {
+		
 		final int id;
 		final InMemorySorter<E> buffer;
 
 		public CircularElement() {
-			this.buffer = null;
 			this.id = -1;
+			this.buffer = null;
 		}
 
 		public CircularElement(int id, InMemorySorter<E> buffer) {
@@ -770,8 +770,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				internalHandleException(new IOException("Thread '" + getName() + "' terminated due to an exception: "
 					+ t.getMessage(), t));
 			}
-			finally {
-			}
 		}
 
 		/**
@@ -915,6 +913,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				// write the last leftover pair, if we have one
 				if (leftoverRecord != null) {
 					if (!buffer.write(leftoverRecord)) {
+						// did not fit in a fresh buffer, must be large...
 						if (this.largeRecords != null) {
 							this.largeRecords.addRecord(leftoverRecord);
 						} else {
@@ -923,6 +922,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 						}
 						buffer.reset();
 					}
+					
 					leftoverRecord = null;
 				}
 				
@@ -944,6 +944,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 							fullBuffer = true;
 							break;
 						}
+						
+						// successfully added record
+						
 						if (bytesUntilSpilling - buffer.getOccupancy() <= 0) {
 							bytesUntilSpilling = 0;
 							
@@ -1303,7 +1306,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Spilling buffer " + element.id + ".");
 				}
-				element.buffer.writeToOutput(output);
+				
+				element.buffer.writeToOutput(output, largeRecordHandler);
+				
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Spilled buffer " + element.id + ".");
 				}
@@ -1655,166 +1660,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		}
 	}
 	
-	/**
-	 *
-	 */
-	public static final class InputDataCollector<E> implements Collector<E>
-	{
-		private final CircularQueues<E> queues;		// the queues used to pass buffers
-		
-		private InMemorySorter<E> currentBuffer;
-		
-		private CircularElement<E> currentElement;
-		
-		private long bytesUntilSpilling;			// number of bytes left before we signal to spill
-		
-		private boolean spillingInThisBuffer;
-		
-		private volatile boolean running;
-		
-
-		public InputDataCollector(CircularQueues<E> queues, long startSpillingBytes)
-		{
-			this.queues = queues;
-			this.bytesUntilSpilling = startSpillingBytes;
-			this.running = true;
-			
-			grabBuffer();
-		}
+	protected static final class ChannelWithBlockCount {
 		
-		private void grabBuffer()
-		{
-			while (this.currentElement == null) {
-				try {
-					this.currentElement = this.queues.empty.take();
-				}
-				catch (InterruptedException iex) {
-					if (this.running) {
-						LOG.error("Reading thread was interrupted (without being shut down) while grabbing a buffer. " +
-								"Retrying to grab buffer...");
-					} else {
-						return;
-					}
-				}
-			}
-			
-			this.currentBuffer = this.currentElement.buffer;
-			if (!this.currentBuffer.isEmpty()) {
-				throw new RuntimeException("New sort-buffer is not empty.");
-			}
-			
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Retrieved empty read buffer " + this.currentElement.id + ".");
-			}
-			
-			this.spillingInThisBuffer = this.currentBuffer.getCapacity() <= this.bytesUntilSpilling;
-		}
-		
-
-		@Override
-		public void collect(E record)
-		{
-			try {
-				if (this.spillingInThisBuffer) {
-					if (this.currentBuffer.write(record)) {
-						if (this.bytesUntilSpilling - this.currentBuffer.getOccupancy() <= 0) {
-							this.bytesUntilSpilling = 0;
-							// send the sentinel
-							this.queues.sort.add(UnilateralSortMerger.<E>spillingMarker());
-						}
-						return;
-					}
-				}
-				else {
-					// no spilling in this buffer
-					if (this.currentBuffer.write(record)) {
-						return;
-					}
-				}
-				
-				if (this.bytesUntilSpilling > 0) {
-					this.bytesUntilSpilling -= this.currentBuffer.getCapacity();
-					if (this.bytesUntilSpilling <= 0) {
-						this.bytesUntilSpilling = 0;
-						// send the sentinel
-						this.queues.sort.add(UnilateralSortMerger.<E>spillingMarker());
-					}
-				}
-				
-				// we came here when the buffer could not be written. send it to the sorter
-				// send the buffer
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Emitting full buffer from reader thread: " + this.currentElement.id + ".");
-				}
-				this.queues.sort.add(this.currentElement);
-				this.currentElement = null;
-				
-				// we need a new buffer. grab the next one
-				while (this.running && this.currentElement == null) {
-					try {
-						this.currentElement = this.queues.empty.take();
-					}
-					catch (InterruptedException iex) {
-						if (this.running) {
-							LOG.error("Reading thread was interrupted (without being shut down) while grabbing a buffer. " +
-									"Retrying to grab buffer...");
-						} else {
-							return;
-						}
-					}
-				}
-				if (!this.running) {
-					return;
-				}
-				
-				this.currentBuffer = this.currentElement.buffer;
-				if (!this.currentBuffer.isEmpty()) {
-					throw new RuntimeException("BUG: New sort-buffer is not empty.");
-				}
-				
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Retrieved empty read buffer " + this.currentElement.id + ".");
-				}
-				// write the record
-				if (!this.currentBuffer.write(record)) {
-					throw new RuntimeException("Record could not be written to empty sort-buffer: Serialized record exceeds buffer capacity.");
-				}
-			}
-			catch (IOException ioex) {
-				throw new RuntimeException("BUG: An error occurred while writing a record to the sort buffer: " + 
-						ioex.getMessage(), ioex);
-			}
-		}
-		
-
-		@Override
-		public void close()
-		{
-			if (this.running) {
-				this.running = false;
-				
-				if (this.currentBuffer != null && this.currentElement != null) {
-					if (this.currentBuffer.isEmpty()) {
-						this.queues.empty.add(this.currentElement);
-					}
-					else {
-						this.queues.sort.add(this.currentElement);
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Emitting last buffer from input collector: " + this.currentElement.id + ".");
-						}
-					}
-				}
-				
-				this.currentBuffer = null;
-				this.currentElement = null;
-				
-				this.queues.sort.add(UnilateralSortMerger.<E>endMarker());
-			}
-		}
-	}
-	
-	protected static final class ChannelWithBlockCount
-	{
 		private final FileIOChannel.ID channel;
 		private final int blockCount;
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
index 6782b91..999c4b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.util;
 
+import java.util.NoSuchElementException;
+
 /**
  * Minimal implementation of an array-backed list of ints
  */
@@ -42,16 +44,12 @@ public class IntArrayList {
 		return true;
 	}
 
-	public int removeInt(int index) {
-		if(index >= size) {
-			throw new IndexOutOfBoundsException("Index (" + index + ") is greater than or equal to list size (" + size + ")");
-		}
-		final int old = array[ index ];
-		size--;
-		if(index != size) {
-			System.arraycopy(array, index+1, array, index, size-index );
+	public int removeLast() {
+		if (size == 0) {
+			throw new NoSuchElementException();
 		}
-		return old;
+		--size;
+		return array[size];
 	}
 	
 	public void clear() {
@@ -59,7 +57,7 @@ public class IntArrayList {
 	}
 	
 	public boolean isEmpty() {
-		return (size==0);
+		return size == 0;
 	}
 	
 	private void grow(final int length) {
@@ -71,4 +69,14 @@ public class IntArrayList {
 		}
 	}
 
+	public static final IntArrayList EMPTY = new IntArrayList(0) {
+		
+		public boolean add(int number) {
+			throw new UnsupportedOperationException();
+		}
+		
+		public int removeLast() {
+			throw new UnsupportedOperationException();
+		};
+	};
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 914359d..6e35ad0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -18,29 +18,12 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import static org.junit.Assert.*;
-
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
 import java.util.Comparator;
-import java.util.Random;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -49,18 +32,21 @@ import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
 import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
 import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.Key;
+import org.apache.flink.runtime.operators.testutils.TestData.Value;
 import org.apache.flink.runtime.operators.testutils.types.IntPair;
 import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ExternalSortITCase {
@@ -77,7 +63,7 @@ public class ExternalSortITCase {
 
 	private static final int NUM_PAIRS = 200000;
 
-	private static final int MEMORY_SIZE = 1024 * 1024 * 78;
+	private static final int MEMORY_SIZE = 1024 * 1024 * 78;	
 	
 	private final AbstractInvokable parentTask = new DummyInvokable();
 
@@ -256,7 +242,7 @@ public class ExternalSortITCase {
 		merger.close();
 	}
 
-//	@Test
+	@Test
 	public void testSpillingSortWithIntermediateMerge() throws Exception {
 		// amount of pairs
 		final int PAIRS = 10000000;
@@ -310,7 +296,7 @@ public class ExternalSortITCase {
 		merger.close();
 	}
 	
-//	@Test
+	@Test
 	public void testSpillingSortWithIntermediateMergeIntPair() throws Exception {
 		// amount of pairs
 		final int PAIRS = 50000000;
@@ -361,214 +347,4 @@ public class ExternalSortITCase {
 		Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead);
 		merger.close();
 	}
-	
-	@Test
-	public void testSortWithLongRecordsOnly() {
-		try {
-			final int NUM_RECORDS = 10;
-			
-			final TypeInformation<?>[] types = new TypeInformation<?>[] {
-					BasicTypeInfo.LONG_TYPE_INFO,
-					new ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class)
-				};
-			
-			final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = 
-								new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
-			final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer();
-			final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
-			
-			MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = 
-					new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>()
-			{
-				private final Random rnd = new Random();
-				private int num = 0;
-				
-				@Override
-				public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) {
-					if (num++ < NUM_RECORDS) {
-						long val = rnd.nextLong();
-						return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val));
-					}
-					else {
-						return null;
-					}
-					
-				}
-			};
-			
-			@SuppressWarnings("unchecked")
-			Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
-					this.memoryManager, this.ioManager, 
-					source, this.parentTask,
-					new RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
-					comparator, 1.0, 1, 128, 0.7f);
-			
-			// check order
-			MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator();
-			
-			Tuple2<Long, SomeMaybeLongValue> val = serializer.createInstance();
-			
-			long prevKey = Long.MAX_VALUE;
-
-			for (int i = 0; i < NUM_RECORDS; i++) {
-				val = iterator.next(val);
-				
-				assertTrue(val.f0 <= prevKey);
-				assertTrue(val.f0.intValue() == val.f1.val());
-			}
-			
-			assertNull(iterator.next(val));
-			
-			sorter.close();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSortWithLongAndShortRecordsMixed() {
-		try {
-			final int NUM_RECORDS = 1000000;
-			final int LARGE_REC_INTERVAL = 100000;
-			
-			final TypeInformation<?>[] types = new TypeInformation<?>[] {
-					BasicTypeInfo.LONG_TYPE_INFO,
-					new ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class)
-				};
-			
-			final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = 
-								new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
-			final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer();
-			final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
-			
-			MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = 
-					new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>()
-			{
-				private final Random rnd = new Random();
-				private int num = -1;
-				
-				@Override
-				public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) {
-					if (++num < NUM_RECORDS) {
-						long val = rnd.nextLong();
-						return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % LARGE_REC_INTERVAL == 0));
-					}
-					else {
-						return null;
-					}
-					
-				}
-			};
-			
-			@SuppressWarnings("unchecked")
-			Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
-					this.memoryManager, this.ioManager, 
-					source, this.parentTask,
-					new RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
-					comparator, 1.0, 1, 128, 0.7f);
-			
-			// check order
-			MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator();
-			
-			Tuple2<Long, SomeMaybeLongValue> val = serializer.createInstance();
-			
-			long prevKey = Long.MAX_VALUE;
-
-			for (int i = 0; i < NUM_RECORDS; i++) {
-				val = iterator.next(val);
-				
-				assertTrue(val.f0 <= prevKey);
-				assertTrue(val.f0.intValue() == val.f1.val());
-			}
-			
-			assertNull(iterator.next(val));
-			
-			sorter.close();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static final class SomeMaybeLongValue implements org.apache.flink.types.Value {
-		
-		private static final long serialVersionUID = 1L;
-
-		private static final byte[] BUFFER = new byte[100000000];
-		
-		static {
-			for (int i = 0; i < BUFFER.length; i++) {
-				BUFFER[i] = (byte) i;
-			}
-		}
-		
-		private int val;
-		
-		private boolean isLong;
-		
-
-		public SomeMaybeLongValue() {
-			this.isLong = true;
-		}
-		
-		public SomeMaybeLongValue(int val) {
-			this.val = val;
-			this.isLong = true;
-		}
-		
-		public SomeMaybeLongValue(int val, boolean isLong) {
-			this.val = val;
-			this.isLong = isLong;
-		}
-		
-		public int val() {
-			return val;
-		}
-		
-		public boolean isLong() {
-			return isLong;
-		}
-		
-		@Override
-		public void read(DataInputView in) throws IOException {
-			val = in.readInt();
-			isLong = in.readBoolean();
-			
-			if (isLong) {
-				for (int i = 0; i < BUFFER.length; i++) {
-					byte b = in.readByte();
-					assertEquals(BUFFER[i], b);
-				}
-			}
-		}
-		
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			out.writeInt(val);
-			out.writeBoolean(isLong);
-			if (isLong) {
-				out.write(BUFFER);
-			}
-		}
-		
-		@Override
-		public int hashCode() {
-			return val;
-		}
-		
-		@Override
-		public boolean equals(Object obj) {
-			return (obj instanceof SomeMaybeLongValue) && ((SomeMaybeLongValue) obj).val == this.val;
-		}
-		
-		@Override
-		public String toString() {
-			return isLong ? "Large Value" : "Small Value";
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
new file mode 100644
index 0000000..33d15ae
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class ExternalSortLargeRecordsITCase {
+
+	private static final int MEMORY_SIZE = 1024 * 1024 * 78;	
+	
+	private final AbstractInvokable parentTask = new DummyInvokable();
+
+	private IOManager ioManager;
+
+	private MemoryManager memoryManager;
+
+	// --------------------------------------------------------------------------------------------
+
+	@Before
+	public void beforeTest() {
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest() {
+		this.ioManager.shutdown();
+		if (!this.ioManager.isProperlyShutDown()) {
+			Assert.fail("I/O Manager was not properly shut down.");
+		}
+		
+		if (this.memoryManager != null) {
+			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", 
+				this.memoryManager.verifyEmpty());
+			this.memoryManager.shutdown();
+			this.memoryManager = null;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testSortWithLongRecordsOnly() {
+		try {
+			final int NUM_RECORDS = 10;
+			
+			final TypeInformation<?>[] types = new TypeInformation<?>[] {
+					BasicTypeInfo.LONG_TYPE_INFO,
+					new ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class)
+				};
+			
+			final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = 
+								new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
+			final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer();
+			final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
+			
+			MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = 
+					new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>()
+			{
+				private final Random rnd = new Random();
+				private int num = 0;
+				
+				@Override
+				public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) {
+					if (num++ < NUM_RECORDS) {
+						long val = rnd.nextLong();
+						return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val));
+					}
+					else {
+						return null;
+					}
+					
+				}
+			};
+			
+			@SuppressWarnings("unchecked")
+			Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
+					this.memoryManager, this.ioManager, 
+					source, this.parentTask,
+					new RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
+					comparator, 1.0, 1, 128, 0.7f);
+			
+			// check order
+			MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator();
+			
+			Tuple2<Long, SomeMaybeLongValue> val = serializer.createInstance();
+			
+			long prevKey = Long.MAX_VALUE;
+
+			for (int i = 0; i < NUM_RECORDS; i++) {
+				val = iterator.next(val);
+				
+				assertTrue(val.f0 <= prevKey);
+				assertTrue(val.f0.intValue() == val.f1.val());
+			}
+			
+			assertNull(iterator.next(val));
+			
+			sorter.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSortWithLongAndShortRecordsMixed() {
+		try {
+			final int NUM_RECORDS = 1000000;
+			final int LARGE_REC_INTERVAL = 100000;
+			
+			final TypeInformation<?>[] types = new TypeInformation<?>[] {
+					BasicTypeInfo.LONG_TYPE_INFO,
+					new ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class)
+				};
+			
+			final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = 
+								new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
+			final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer();
+			final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
+			
+			MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = 
+					new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>()
+			{
+				private final Random rnd = new Random();
+				private int num = -1;
+				
+				@Override
+				public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) {
+					if (++num < NUM_RECORDS) {
+						long val = rnd.nextLong();
+						return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % LARGE_REC_INTERVAL == 0));
+					}
+					else {
+						return null;
+					}
+					
+				}
+			};
+			
+			@SuppressWarnings("unchecked")
+			Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
+					this.memoryManager, this.ioManager, 
+					source, this.parentTask,
+					new RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
+					comparator, 1.0, 1, 128, 0.7f);
+			
+			// check order
+			MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator();
+			
+			Tuple2<Long, SomeMaybeLongValue> val = serializer.createInstance();
+			
+			long prevKey = Long.MAX_VALUE;
+
+			for (int i = 0; i < NUM_RECORDS; i++) {
+				val = iterator.next(val);
+				
+				assertTrue(val.f0 <= prevKey);
+				assertTrue(val.f0.intValue() == val.f1.val());
+			}
+			
+			assertNull(iterator.next(val));
+			
+			sorter.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSortWithShortMediumAndLargeRecords() {
+		try {
+			final int NUM_RECORDS = 50000;
+			final int LARGE_REC_INTERVAL = 10000;
+			final int MEDIUM_REC_INTERVAL = 500;
+			
+			final TypeInformation<?>[] types = new TypeInformation<?>[] {
+					BasicTypeInfo.LONG_TYPE_INFO,
+					new ValueTypeInfo<SmallOrMediumOrLargeValue>(SmallOrMediumOrLargeValue.class)
+				};
+			
+			final TupleTypeInfo<Tuple2<Long, SmallOrMediumOrLargeValue>> typeInfo = 
+								new TupleTypeInfo<Tuple2<Long,SmallOrMediumOrLargeValue>>(types);
+			
+			final TypeSerializer<Tuple2<Long, SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer();
+			final TypeComparator<Tuple2<Long, SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
+			
+			MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = 
+					new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>()
+			{
+				private final Random rnd = new Random();
+				private int num = -1;
+				
+				@Override
+				public Tuple2<Long, SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) {
+					if (++num < NUM_RECORDS) {
+						
+						int size;
+						if (num % LARGE_REC_INTERVAL == 0) {
+							size = SmallOrMediumOrLargeValue.LARGE_SIZE;
+						} else if (num % MEDIUM_REC_INTERVAL == 0) {
+							size = SmallOrMediumOrLargeValue.MEDIUM_SIZE;
+						} else {
+							size = SmallOrMediumOrLargeValue.SMALL_SIZE;
+						}
+						
+						long val = rnd.nextLong();
+						return new Tuple2<Long, SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, size));
+					}
+					else {
+						return null;
+					}
+					
+				}
+			};
+			
+			@SuppressWarnings("unchecked")
+			Sorter<Tuple2<Long, SmallOrMediumOrLargeValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SmallOrMediumOrLargeValue>>(
+					this.memoryManager, this.ioManager, 
+					source, this.parentTask,
+					new RuntimeStatefulSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
+					comparator, 1.0, 1, 128, 0.7f);
+			
+			// check order
+			MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();
+			
+			Tuple2<Long, SmallOrMediumOrLargeValue> val = serializer.createInstance();
+			
+			long prevKey = Long.MAX_VALUE;
+
+			for (int i = 0; i < NUM_RECORDS; i++) {
+				val = iterator.next(val);
+				
+				assertTrue(val.f0 <= prevKey);
+				assertTrue(val.f0.intValue() == val.f1.val());
+			}
+			
+			assertNull(iterator.next(val));
+			
+			sorter.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class SomeMaybeLongValue implements org.apache.flink.types.Value {
+		
+		private static final long serialVersionUID = 1L;
+
+		private static final byte[] BUFFER = new byte[100 * 1024 * 1024];
+		
+		static {
+			for (int i = 0; i < BUFFER.length; i++) {
+				BUFFER[i] = (byte) i;
+			}
+		}
+		
+		private int val;
+		
+		private boolean isLong;
+		
+
+		public SomeMaybeLongValue() {
+			this.isLong = true;
+		}
+		
+		public SomeMaybeLongValue(int val) {
+			this.val = val;
+			this.isLong = true;
+		}
+		
+		public SomeMaybeLongValue(int val, boolean isLong) {
+			this.val = val;
+			this.isLong = isLong;
+		}
+		
+		public int val() {
+			return val;
+		}
+		
+		public boolean isLong() {
+			return isLong;
+		}
+		
+		@Override
+		public void read(DataInputView in) throws IOException {
+			val = in.readInt();
+			isLong = in.readBoolean();
+			
+			if (isLong) {
+				for (int i = 0; i < BUFFER.length; i++) {
+					byte b = in.readByte();
+					assertEquals(BUFFER[i], b);
+				}
+			}
+		}
+		
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeInt(val);
+			out.writeBoolean(isLong);
+			if (isLong) {
+				out.write(BUFFER);
+			}
+		}
+		
+		@Override
+		public int hashCode() {
+			return val;
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			return (obj instanceof SomeMaybeLongValue) && ((SomeMaybeLongValue) obj).val == this.val;
+		}
+		
+		@Override
+		public String toString() {
+			return isLong ? "Large Value" : "Small Value";
+		}
+	}
+	
+	public static final class SmallOrMediumOrLargeValue implements org.apache.flink.types.Value {
+		
+		private static final long serialVersionUID = 1L;
+		
+		public static final int TYPE_SMALL = 0;
+		public static final int TYPE_MEDIUM = 1;
+		public static final int TYPE_LARGE = 2;
+		
+		public static final int SMALL_SIZE = 0;
+		public static final int MEDIUM_SIZE = 12 * 1024 * 1024;
+		public static final int LARGE_SIZE = 100 * 1024 * 1024;
+		
+		private int val;
+		
+		private int size;
+		
+
+		public SmallOrMediumOrLargeValue() {
+			this.size = SMALL_SIZE;
+		}
+		
+		public SmallOrMediumOrLargeValue(int val) {
+			this.val = val;
+			this.size = SMALL_SIZE;
+		}
+		
+		public SmallOrMediumOrLargeValue(int val, int size) {
+			this.val = val;
+			this.size = size;
+		}
+		
+		public int val() {
+			return val;
+		}
+		
+		public int getSize() {
+			return size;
+		}
+		
+		@Override
+		public void read(DataInputView in) throws IOException {
+			val = in.readInt();
+			size = in.readInt();
+			
+			for (int i = 0; i < size; i++) {
+				byte b = in.readByte();
+				assertEquals((byte) i, b);
+			}
+		}
+		
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeInt(val);
+			out.writeInt(size);
+			
+			for (int i = 0; i < size; i++) {
+				out.write((byte) (i));
+			}
+		}
+		
+		@Override
+		public int hashCode() {
+			return val;
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof SmallOrMediumOrLargeValue) {
+				SmallOrMediumOrLargeValue other = (SmallOrMediumOrLargeValue) obj;
+				return other.val == this.val && other.size == this.size;
+			} else {
+				return false;
+			}
+		}
+		
+		@Override
+		public String toString() {
+			return String.format("Value %d (%d bytes)", val, size);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
index 55d01d2..a711d80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.BufferedReader;
@@ -48,9 +47,6 @@ public class MassiveStringSortingITCase {
 
 	private static final long SEED = 347569784659278346L;
 	
-	@SuppressWarnings("unused")
-	private static final char LINE_BREAK = '\n';
-	
 	
 	public void testStringSorting() {
 		File input = null;


Mime
View raw message