flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [6/8] flink git commit: [FLINK-2754] Fixed FixedLengthRecordSorter write to multi memory pages issue and add more unit tests.
Date Tue, 29 Sep 2015 12:08:37 GMT
[FLINK-2754] Fixed FixedLengthRecordSorter write to multi memory pages issue and add more unit
tests.

This closes #1178


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68912126
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68912126
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68912126

Branch: refs/heads/master
Commit: 68912126d73b92a07d15ec3f21f9ac922744fb45
Parents: e727355
Author: chengxiang li <chengxiang.li@intel.com>
Authored: Thu Sep 24 11:20:10 2015 +0800
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Sep 29 12:18:49 2015 +0200

----------------------------------------------------------------------
 .../operators/sort/FixedLengthRecordSorter.java |   4 +-
 .../sort/FixedLengthRecordSorterTest.java       | 109 +++++++++++++++++++
 2 files changed, 112 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68912126/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index da96b17..3a44ab5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -447,11 +447,13 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T>
{
 				num -= recordsPerSegment;
 			} else {
 				// partially filled segment
-				for (; num > 0; num--) {
+				for (; num > 0 && offset <= this.lastEntryOffset; num--, offset += this.recordSize)
{
 					record = comparator.readWithKeyDenormalization(record, inView);
 					serializer.serialize(record, output);
 				}
 			}
+
+			offset = 0;
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/68912126/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index 517bec3..288e86d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -25,6 +25,14 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
+import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
@@ -48,6 +56,8 @@ public class FixedLengthRecordSorterTest {
 	private static final int MEMORY_PAGE_SIZE = 32 * 1024; 
 
 	private MemoryManager memoryManager;
+
+	private IOManager ioManager;
 	
 	private TypeSerializer<IntPair> serializer;
 	
@@ -57,6 +67,7 @@ public class FixedLengthRecordSorterTest {
 	@Before
 	public void beforeTest() {
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, MemoryType.HEAP,
true);
+		this.ioManager = new IOManagerAsync();
 		this.serializer = new IntPairSerializer();
 		this.comparator = new IntPairComparator();
 	}
@@ -368,4 +379,102 @@ public class FixedLengthRecordSorterTest {
 		sorter.dispose();
 		this.memoryManager.release(memory);
 	}
+
+	@Test
+	public void testFlushFullMemoryPage() throws Exception {
+		// Insert IntPair which would fill 2 memory pages.
+		final int NUM_RECORDS = 2 * MEMORY_PAGE_SIZE / 8;
+		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(),
3);
+
+		FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
+		UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, false);
+
+		// write the records
+		IntPair record = new IntPair();
+		int num = -1;
+		do {
+			generator.next(record);
+			num++;
+		}
+		while (sorter.write(record) && num < NUM_RECORDS);
+
+		FileIOChannel.ID channelID = this.ioManager.createChannelEnumerator().next();
+		BlockChannelWriter<MemorySegment> blockChannelWriter = this.ioManager.createBlockChannelWriter(channelID);
+		final List<MemorySegment> writeBuffer = this.memoryManager.allocatePages(new DummyInvokable(),
3);
+		ChannelWriterOutputView outputView = new ChannelWriterOutputView(blockChannelWriter, writeBuffer,
writeBuffer.get(0).size());
+
+		sorter.writeToOutput(outputView, 0, NUM_RECORDS);
+
+		this.memoryManager.release(outputView.close());
+
+		BlockChannelReader<MemorySegment> blockChannelReader = this.ioManager.createBlockChannelReader(channelID);
+		final List<MemorySegment> readBuffer = this.memoryManager.allocatePages(new DummyInvokable(),
3);
+		ChannelReaderInputView readerInputView = new ChannelReaderInputView(blockChannelReader,
readBuffer, false);
+		final List<MemorySegment> dataBuffer = this.memoryManager.allocatePages(new DummyInvokable(),
3);
+		ChannelReaderInputViewIterator<IntPair> iterator = new ChannelReaderInputViewIterator(readerInputView,
dataBuffer, this.serializer);
+
+		record = iterator.next(record);
+		int i =0;
+		while (record != null) {
+			Assert.assertEquals(i, record.getKey());
+			record = iterator.next(record);
+			i++;
+		}
+
+		Assert.assertEquals(NUM_RECORDS, i);
+
+		this.memoryManager.release(dataBuffer);
+		// release the memory occupied by the buffers
+		sorter.dispose();
+		this.memoryManager.release(memory);
+	}
+
+	@Test
+	public void testFlushPartialMemoryPage() throws Exception {
+		// Insert IntPair which would fill 2 memory pages.
+		final int NUM_RECORDS = 2 * MEMORY_PAGE_SIZE / 8;
+		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(),
3);
+
+		FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
+		UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, false);
+
+		// write the records
+		IntPair record = new IntPair();
+		int num = -1;
+		do {
+			generator.next(record);
+			num++;
+		}
+		while (sorter.write(record) && num < NUM_RECORDS);
+
+		FileIOChannel.ID channelID = this.ioManager.createChannelEnumerator().next();
+		BlockChannelWriter<MemorySegment> blockChannelWriter = this.ioManager.createBlockChannelWriter(channelID);
+		final List<MemorySegment> writeBuffer = this.memoryManager.allocatePages(new DummyInvokable(),
3);
+		ChannelWriterOutputView outputView = new ChannelWriterOutputView(blockChannelWriter, writeBuffer,
writeBuffer.get(0).size());
+
+		sorter.writeToOutput(outputView, 1, NUM_RECORDS - 1);
+
+		this.memoryManager.release(outputView.close());
+
+		BlockChannelReader<MemorySegment> blockChannelReader = this.ioManager.createBlockChannelReader(channelID);
+		final List<MemorySegment> readBuffer = this.memoryManager.allocatePages(new DummyInvokable(),
3);
+		ChannelReaderInputView readerInputView = new ChannelReaderInputView(blockChannelReader,
readBuffer, false);
+		final List<MemorySegment> dataBuffer = this.memoryManager.allocatePages(new DummyInvokable(),
3);
+		ChannelReaderInputViewIterator<IntPair> iterator = new ChannelReaderInputViewIterator(readerInputView,
dataBuffer, this.serializer);
+
+		record = iterator.next(record);
+		int i =1;
+		while (record != null) {
+			Assert.assertEquals(i, record.getKey());
+			record = iterator.next(record);
+			i++;
+		}
+
+		Assert.assertEquals(NUM_RECORDS, i);
+
+		this.memoryManager.release(dataBuffer);
+		// release the memory occupied by the buffers
+		sorter.dispose();
+		this.memoryManager.release(memory);
+	}
 }


Mime
View raw message