flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [02/13] flink git commit: [FLINK-2869] [tests] Port IOManagerPerformanceBenchmark to JMH.
Date Thu, 29 Oct 2015 12:22:35 GMT
[FLINK-2869] [tests] Port IOManagerPerformanceBenchmark to JMH.

This closes #1270.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3abbcd1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3abbcd1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3abbcd1e

Branch: refs/heads/master
Commit: 3abbcd1eed42c2f7a65243ec6f27d16dbfc235b5
Parents: b654e98
Author: gallenvara <gallenvara@126.com>
Authored: Tue Oct 20 09:49:34 2015 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Oct 29 10:23:22 2015 +0100

----------------------------------------------------------------------
 .../IOManagerPerformanceBenchmark.java          | 608 +++++++++++++++++++
 .../IOManagerPerformanceBenchmark.java          | 415 -------------
 2 files changed, 608 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3abbcd1e/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
new file mode 100644
index 0000000..7d31925
--- /dev/null
+++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.runtime.io.disk.iomanager;
+
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.runtime.io.disk.iomanager.*;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.types.IntValue;
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class IOManagerPerformanceBenchmark {
+
+	private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
+
+	@Param({"4096", "16384", "524288"})
+	private int segmentSizesAligned;
+
+	@Param({"3862", "16895", "500481"})
+	private int segmentSizesUnaligned;
+
+	@Param({"1", "2", "4", "6"})
+	private int numSegment;
+
+	private static int numBlocks;
+
+	private static final long MEMORY_SIZE = 32 * 1024 * 1024;
+
+	private static final int NUM_INTS_WRITTEN = 100000000;
+
+	private static final AbstractInvokable memoryOwner = new DummyInvokable();
+
+	private MemoryManager memManager;
+
+	private IOManager ioManager;
+
+	private static FileIOChannel.ID fileIOChannel;
+
+	private static File ioManagerTempFile1;
+
+	private static File ioManagerTempFile2;
+
+	private static File speedTestNIOTempFile1;
+
+	private static File speedTestNIOTempFile2;
+
+	private static File speedTestNIOTempFile3;
+
+	private static File speedTestNIOTempFile4;
+
+
+	@Setup
+	public void startup() throws Exception {
+		memManager = new MemoryManager(MEMORY_SIZE, 1);
+		ioManager = new IOManagerAsync();
+		testChannelWriteWithSegments(numSegment);
+		ioManagerTempFile1 = createReadTempFile(segmentSizesAligned);
+		ioManagerTempFile2 = createReadTempFile(segmentSizesUnaligned);
+		speedTestNIOTempFile1 = createSpeedTestNIOTempFile(segmentSizesAligned, true);
+		speedTestNIOTempFile2 = createSpeedTestNIOTempFile(segmentSizesAligned, false);
+		speedTestNIOTempFile3 = createSpeedTestNIOTempFile(segmentSizesUnaligned, true);
+		speedTestNIOTempFile4 = createSpeedTestNIOTempFile(segmentSizesUnaligned, false);
+	}
+
+	@TearDown
+	public void afterTest() throws Exception {
+		ioManagerTempFile1.delete();
+		ioManagerTempFile2.delete();
+		speedTestNIOTempFile1.delete();
+		speedTestNIOTempFile2.delete();
+		speedTestNIOTempFile3.delete();
+		speedTestNIOTempFile4.delete();
+		ioManager.shutdown();
+		Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
+
+		Assert.assertTrue("Not all memory was returned to the memory manager in the test.", memManager.verifyEmpty());
+		memManager.shutdown();
+		memManager = null;
+	}
+
+// ------------------------------------------------------------------------
+
+	private File createReadTempFile(int bufferSize) throws IOException {
+		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+		final IntValue rec = new IntValue(0);
+
+		File tempFile = null;
+		DataOutputStream daos = null;
+
+		try {
+			tempFile = new File(tmpChannel.getPath());
+
+			FileOutputStream fos = new FileOutputStream(tempFile);
+			daos = new DataOutputStream(new BufferedOutputStream(fos, bufferSize));
+
+			int valsLeft = NUM_INTS_WRITTEN;
+			while (valsLeft-- > 0) {
+				rec.setValue(valsLeft);
+				rec.write(new OutputViewDataOutputStreamWrapper(daos));
+			}
+			daos.close();
+			daos = null;
+		}
+		finally {
+			// close if possible
+			if (daos != null) {
+				daos.close();
+			}
+		}
+		return tempFile;
+	}
+
+	@SuppressWarnings("resource")
+	private File createSpeedTestNIOTempFile(int bufferSize, boolean direct) throws IOException
+	{
+		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+
+		File tempFile = null;
+		FileChannel fs = null;
+
+		try {
+			tempFile = new File(tmpChannel.getPath());
+
+			RandomAccessFile raf = new RandomAccessFile(tempFile, "rw");
+			fs = raf.getChannel();
+
+			ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+
+			int valsLeft = NUM_INTS_WRITTEN;
+			while (valsLeft-- > 0) {
+				if (buf.remaining() < 4) {
+					buf.flip();
+					fs.write(buf);
+					buf.clear();
+				}
+				buf.putInt(valsLeft);
+			}
+
+			if (buf.position() > 0) {
+				buf.flip();
+				fs.write(buf);
+			}
+
+			fs.close();
+			raf.close();
+			fs = null;
+		}
+		finally {
+			// close if possible
+			if (fs != null) {
+				fs.close();
+				fs = null;
+			}
+		}
+		return tempFile;
+	}
+
+	@Benchmark
+	public void speedTestOutputManager() throws Exception
+	{
+		LOG.info("Starting speed test with IO Manager...");
+
+		testChannelWriteWithSegments(numSegment);
+	}
+
+	@Benchmark
+	public void speedTestInputManager() throws Exception
+	{
+		LOG.info("Starting speed test with IO Manager...");
+
+		testChannelReadWithSegments(numSegment);
+	}
+
+	private void testChannelWriteWithSegments(int numSegments) throws Exception
+	{
+		final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments);
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
+
+		BlockChannelWriter<MemorySegment> writer = null;
+
+		try {
+			writer = this.ioManager.createBlockChannelWriter(channel);
+			final ChannelWriterOutputView out = new ChannelWriterOutputView(writer, memory, this.memManager.getPageSize());
+
+			int valsLeft = NUM_INTS_WRITTEN;
+			while (valsLeft-- > 0) {
+				out.writeInt(valsLeft);
+			}
+
+			fileIOChannel = channel;
+			out.close();
+			numBlocks = out.getBlockCount();
+
+			writer.close();
+			writer = null;
+
+			memManager.release(memory);
+		}
+		finally {
+			if (writer != null) {
+				writer.closeAndDelete();
+			}
+		}
+	}
+
+	private void testChannelReadWithSegments(int numSegments) throws Exception
+	{
+		final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments);
+
+		BlockChannelReader<MemorySegment> reader = null;
+
+		try {
+			reader = ioManager.createBlockChannelReader(fileIOChannel);
+			final ChannelReaderInputView in = new ChannelReaderInputView(reader, memory, numBlocks,
false);
+
+			int valsLeft = NUM_INTS_WRITTEN;
+			while (valsLeft-- > 0) {
+				in.readInt();
+//				Assert.assertTrue(rec.getValue() == valsLeft);
+			}
+
+			in.close();
+			reader.close();
+			reader = null;
+
+			memManager.release(memory);
+		}
+		finally {
+			if (reader != null) {
+				reader.closeAndDelete();
+			}
+		}
+	}
+
+//	@Test
+//	public void speedTestRandomAccessFile() throws IOException {
+//		LOG.info("Starting speed test with java random access file ...");
+//
+//		Channel.ID tmpChannel = ioManager.createChannel();
+//		File tempFile = null;
+//		RandomAccessFile raf = null;
+//
+//		try {
+//			tempFile = new File(tmpChannel.getPath());
+//			raf = new RandomAccessFile(tempFile, "rw");
+//
+//			IntegerRecord rec = new IntegerRecord(0);
+//
+//			long writeStart = System.currentTimeMillis();
+//
+//			int valsLeft = NUM_INTS_WRITTEN;
+//			while (valsLeft-- > 0) {
+//				rec.setValue(valsLeft);
+//				rec.write(raf);
+//			}
+//			raf.close();
+//			raf = null;
+//
+//			long writeElapsed = System.currentTimeMillis() - writeStart;
+//
+//			// ----------------------------------------------------------------
+//
+//			raf = new RandomAccessFile(tempFile, "r");
+//
+//			long readStart = System.currentTimeMillis();
+//
+//			valsLeft = NUM_INTS_WRITTEN;
+//			while (valsLeft-- > 0) {
+//				rec.read(raf);
+//			}
+//			raf.close();
+//			raf = null;
+//
+//			long readElapsed = System.currentTimeMillis() - readStart;
+//
+//
+//			LOG.info("Random Access File: write " + (writeElapsed / 1000) + " secs, read " + (readElapsed
/ 1000) + " secs.");
+//		}
+//		finally {
+//			// close if possible
+//			if (raf != null) {
+//				raf.close();
+//			}
+//
+//			// try to delete the file
+//			if (tempFile != null) {
+//				tempFile.delete();
+//			}
+//		}
+//	}
+
+	@Benchmark
+	public void speedOutputStreamWithBufferAligned() throws Exception
+	{
+		LOG.info("Starting speed test with java io file stream and ALIGNED buffer sizes ...");
+
+		speedOutputTestStream(segmentSizesAligned);
+	}
+
+	@Benchmark
+	public void speedOutputStreamWithBufferUnaligned() throws Exception
+	{
+		LOG.info("Starting speed test with java io file stream and UNALIGNED buffer sizes ...");
+
+		speedOutputTestStream(segmentSizesUnaligned);
+	}
+
+	@Benchmark
+	public void speedInputStreamWithBufferAligned() throws Exception
+	{
+		LOG.info("Starting speed test with java io file stream and ALIGNED buffer sizes ...");
+
+		speedInputTestStream(segmentSizesAligned);
+	}
+
+	@Benchmark
+	public void speedInputStreamWithBufferUnaligned() throws Exception
+	{
+		LOG.info("Starting speed test with java io file stream and UNALIGNED buffer sizes ...");
+
+		speedInputTestStream(segmentSizesUnaligned);
+	}
+
+	private void speedOutputTestStream(int bufferSize) throws IOException {
+		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+		final IntValue rec = new IntValue(0);
+
+		File tempFile = null;
+		DataOutputStream daos = null;
+
+		try {
+			tempFile = new File(tmpChannel.getPath());
+
+			FileOutputStream fos = new FileOutputStream(tempFile);
+			daos = new DataOutputStream(new BufferedOutputStream(fos, bufferSize));
+
+			int valsLeft = NUM_INTS_WRITTEN;
+			while (valsLeft-- > 0) {
+				rec.setValue(valsLeft);
+				rec.write(new OutputViewDataOutputStreamWrapper(daos));
+			}
+			daos.close();
+			daos = null;
+		}
+		finally {
+			// close if possible
+			if (daos != null) {
+				daos.close();
+			}
+			// try to delete the file
+			if (tempFile != null) {
+				tempFile.delete();
+			}
+		}
+	}
+
+	private void speedInputTestStream(int bufferSize) throws IOException {
+		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+		final IntValue rec = new IntValue(0);
+
+		File tempFile = null;
+		DataInputStream dais = null;
+
+		if (((bufferSize == 4096)||(bufferSize == 16384)||(bufferSize == 524288)))
+		{
+			tempFile = ioManagerTempFile1;
+		}
+		if (((bufferSize == 3862)||(bufferSize == 16895)||(bufferSize == 500481)))
+		{
+			tempFile = ioManagerTempFile2;
+		}
+
+		try {
+			FileInputStream fis = new FileInputStream(tempFile);
+			dais = new DataInputStream(new BufferedInputStream(fis, bufferSize));
+
+			int valsLeft = NUM_INTS_WRITTEN;
+			while (valsLeft-- > 0) {
+				rec.read(new InputViewDataInputStreamWrapper(dais));
+			}
+			dais.close();
+			dais = null;
+		}
+		finally {
+			// close if possible
+			if (dais != null) {
+				dais.close();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Benchmark
+	public void speedWriteIndirectAndBufferAligned() throws Exception
+	{
+		LOG.info("Starting speed test with java NIO heap buffers and ALIGNED buffer sizes ...");
+
+		speedWriteTestNIO(segmentSizesAligned, false);
+	}
+
+	@Benchmark
+	public void speedWriteIndirectAndBufferUnaligned() throws Exception
+	{
+		LOG.info("Starting speed test with java NIO heap buffers and UNALIGNED buffer sizes ...");
+
+		speedWriteTestNIO(segmentSizesUnaligned, false);
+	}
+
+	@Benchmark
+	public void speedWriteDirectAndBufferAligned() throws Exception
+	{
+		LOG.info("Starting speed test with java NIO direct buffers and ALIGNED buffer sizes ...");
+
+		speedWriteTestNIO(segmentSizesAligned, true);
+	}
+
+	@Benchmark
+	public void speedWriteDirectAndBufferUnaligned() throws Exception
+	{
+		LOG.info("Starting speed test with java NIO direct buffers and UNALIGNED buffer sizes ...");
+
+		speedWriteTestNIO(segmentSizesUnaligned, true);
+	}
+
+	@Benchmark
+	public void speedReadIndirectAndBufferAligned() throws Exception
+	{
+		LOG.info("Starting speed test with java NIO heap buffers and ALIGNED buffer sizes ...");
+
+		speedReadTestNIO(segmentSizesAligned, false);
+	}
+
+	@Benchmark
+	public void speedReadIndirectAndBufferUnaligned() throws Exception
+	{
+		LOG.info("Starting speed test with java NIO heap buffers and UNALIGNED buffer sizes ...");
+
+		speedReadTestNIO(segmentSizesUnaligned, false);
+	}
+
+	@Benchmark
+	public void speedReadDirectAndBufferAligned() throws Exception
+	{
+		LOG.info("Starting speed test with java NIO direct buffers and ALIGNED buffer sizes ...");
+
+		speedReadTestNIO(segmentSizesAligned, true);
+	}
+
+	@Benchmark
+	public void speedReadDirectAndBufferUnaligned() throws Exception
+	{
+		LOG.info("Starting speed test with java NIO direct buffers and UNALIGNED buffer sizes ...");
+
+		speedReadTestNIO(segmentSizesUnaligned, true);
+	}
+
+
+	@SuppressWarnings("resource")
+	private void speedWriteTestNIO(int bufferSize, boolean direct) throws IOException
+	{
+		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+
+		File tempFile = null;
+		FileChannel fs = null;
+
+		try {
+			tempFile = new File(tmpChannel.getPath());
+
+			RandomAccessFile raf = new RandomAccessFile(tempFile, "rw");
+			fs = raf.getChannel();
+
+			ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+
+			int valsLeft = NUM_INTS_WRITTEN;
+			while (valsLeft-- > 0) {
+				if (buf.remaining() < 4) {
+					buf.flip();
+					fs.write(buf);
+					buf.clear();
+				}
+				buf.putInt(valsLeft);
+			}
+
+			if (buf.position() > 0) {
+				buf.flip();
+				fs.write(buf);
+			}
+
+			fs.close();
+			raf.close();
+			fs = null;
+		}
+		finally {
+			// close if possible
+			if (fs != null) {
+				fs.close();
+				fs = null;
+			}
+			// try to delete the file
+			if (tempFile != null) {
+				tempFile.delete();
+			}
+		}
+	}
+
+	@SuppressWarnings("resource")
+	private void speedReadTestNIO(int bufferSize, boolean direct) throws IOException
+	{
+		File tempFile = null;
+		FileChannel fs = null;
+
+		if (((bufferSize == 4096)||(bufferSize == 16384)||(bufferSize == 524288))&&(direct))
+		{
+			tempFile = speedTestNIOTempFile1;
+		}
+		if (((bufferSize == 4096)||(bufferSize == 16384)||(bufferSize == 524288))&&(!direct))
+		{
+			tempFile = speedTestNIOTempFile2;
+		}
+		if (((bufferSize == 3862)||(bufferSize == 16895)||(bufferSize == 500481))&&(direct))
+		{
+			tempFile = speedTestNIOTempFile3;
+		}
+		if (((bufferSize == 3862)||(bufferSize == 16895)||(bufferSize == 500481))&&(!direct))
+		{
+			tempFile = speedTestNIOTempFile4;
+		}
+
+		try {
+			ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+
+			RandomAccessFile raf = new RandomAccessFile(tempFile, "r");
+			fs = raf.getChannel();
+			buf.clear();
+
+			fs.read(buf);
+			buf.flip();
+
+			int valsLeft = NUM_INTS_WRITTEN;
+			while (valsLeft-- > 0) {
+				if (buf.remaining() < 4) {
+					buf.compact();
+					fs.read(buf);
+					buf.flip();
+				}
+				if (buf.getInt() != valsLeft) {
+					throw new IOException();
+				}
+			}
+
+			fs.close();
+			raf.close();
+
+		}
+		finally {
+			// close if possible
+			if (fs != null) {
+				fs.close();
+				fs = null;
+			}
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+		Options opt = new OptionsBuilder()
+				.include(IOManagerPerformanceBenchmark.class.getSimpleName())
+				.warmupIterations(2)
+				.measurementIterations(2)
+				.forks(1)
+				.build();
+		new Runner(opt).run();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3abbcd1e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
deleted file mode 100644
index fd02623..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.List;
-
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.types.IntValue;
-import org.junit.Assert;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class IOManagerPerformanceBenchmark {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
-	
-	private static final int[] SEGMENT_SIZES_ALIGNED = { 4096, 16384, 524288 };
-	
-	private static final int[] SEGMENT_SIZES_UNALIGNED = { 3862, 16895, 500481 };
-	
-	private static final int[] NUM_SEGMENTS = { 1, 2, 4, 6 };
-	
-	private static final long MEMORY_SIZE = 32 * 1024 * 1024;
-	
-	private static final int NUM_INTS_WRITTEN = 100000000;
-	
-	
-	private static final AbstractInvokable memoryOwner = new DummyInvokable();
-	
-	private MemoryManager memManager;
-	
-	private IOManager ioManager;
-	
-	
-	@Before
-	public void startup() {
-		memManager = new MemoryManager(MEMORY_SIZE, 1);
-		ioManager = new IOManagerAsync();
-	}
-	
-	@After
-	public void afterTest() throws Exception {
-		ioManager.shutdown();
-		Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
-		
-		Assert.assertTrue("Not all memory was returned to the memory manager in the test.", memManager.verifyEmpty());
-		memManager.shutdown();
-		memManager = null;
-	}
-	
-// ------------------------------------------------------------------------
-	
-	@Test
-	public void speedTestIOManager() throws Exception
-	{
-		LOG.info("Starting speed test with IO Manager...");
-		
-		for (int num : NUM_SEGMENTS) {
-			testChannelWithSegments(num);
-		}
-	}
-
-	private void testChannelWithSegments(int numSegments) throws Exception
-	{
-		final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments);
-		final FileIOChannel.ID channel = this.ioManager.createChannel();
-		
-		BlockChannelWriter<MemorySegment> writer = null;
-		BlockChannelReader<MemorySegment> reader = null;
-		
-		try {	
-			writer = this.ioManager.createBlockChannelWriter(channel);
-			final ChannelWriterOutputView out = new ChannelWriterOutputView(writer, memory, this.memManager.getPageSize());
-			
-			long writeStart = System.currentTimeMillis();
-			
-			int valsLeft = NUM_INTS_WRITTEN;
-			while (valsLeft-- > 0) {
-				out.writeInt(valsLeft);
-			}
-			
-			out.close();
-			final int numBlocks = out.getBlockCount();
-			writer.close();
-			writer = null;
-			
-			long writeElapsed = System.currentTimeMillis() - writeStart;
-			
-			// ----------------------------------------------------------------
-			
-			reader = ioManager.createBlockChannelReader(channel);
-			final ChannelReaderInputView in = new ChannelReaderInputView(reader, memory, numBlocks,
false);
-			
-			long readStart = System.currentTimeMillis();
-			
-			valsLeft = NUM_INTS_WRITTEN;
-			while (valsLeft-- > 0) {
-				in.readInt();
-//				Assert.assertTrue(rec.getValue() == valsLeft);
-			}
-			
-			in.close();
-			reader.close();
-			
-			long readElapsed = System.currentTimeMillis() - readStart;
-			
-			reader.deleteChannel();
-			reader = null;
-			
-			LOG.info("IOManager with " + numSegments + " mem segments: write " + writeElapsed + "
msecs, read " + readElapsed + " msecs.");
-			
-			memManager.release(memory);
-		}
-		finally {
-			if (reader != null) {
-				reader.closeAndDelete();
-			}
-			if (writer != null) {
-				writer.closeAndDelete();
-			}
-		}
-	}
-
-//	@Test
-//	public void speedTestRandomAccessFile() throws IOException {
-//		LOG.info("Starting speed test with java random access file ...");
-//		
-//		Channel.ID tmpChannel = ioManager.createChannel();
-//		File tempFile = null;
-//		RandomAccessFile raf = null;
-//		
-//		try {
-//			tempFile = new File(tmpChannel.getPath()); 
-//			raf = new RandomAccessFile(tempFile, "rw");
-//			
-//			IntegerRecord rec = new IntegerRecord(0);
-//			
-//			long writeStart = System.currentTimeMillis();
-//			
-//			int valsLeft = NUM_INTS_WRITTEN;
-//			while (valsLeft-- > 0) {
-//				rec.setValue(valsLeft);
-//				rec.write(raf);
-//			}
-//			raf.close();
-//			raf = null;
-//			
-//			long writeElapsed = System.currentTimeMillis() - writeStart;
-//			
-//			// ----------------------------------------------------------------
-//			
-//			raf = new RandomAccessFile(tempFile, "r");
-//			
-//			long readStart = System.currentTimeMillis();
-//			
-//			valsLeft = NUM_INTS_WRITTEN;
-//			while (valsLeft-- > 0) {
-//				rec.read(raf);
-//			}
-//			raf.close();
-//			raf = null;
-//			
-//			long readElapsed = System.currentTimeMillis() - readStart;
-//			
-//			
-//			LOG.info("Random Access File: write " + (writeElapsed / 1000) + " secs, read " + (readElapsed
/ 1000) + " secs.");
-//		}
-//		finally {
-//			// close if possible
-//			if (raf != null) {
-//				raf.close();
-//			}
-//			
-//			// try to delete the file
-//			if (tempFile != null) {
-//				tempFile.delete();
-//			}
-//		}
-//	}
-
-	@Test
-	public void speedTestFileStream() throws Exception
-	{
-		LOG.info("Starting speed test with java io file stream and ALIGNED buffer sizes ...");
-		
-		for (int bufferSize : SEGMENT_SIZES_ALIGNED)
-		{
-			speedTestStream(bufferSize);
-		}
-		
-		LOG.info("Starting speed test with java io file stream and UNALIGNED buffer sizes ...");
-		
-		for (int bufferSize : SEGMENT_SIZES_UNALIGNED)
-		{
-			speedTestStream(bufferSize);
-		}
-		
-	}
-		
-	private void speedTestStream(int bufferSize) throws IOException {
-		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
-		final IntValue rec = new IntValue(0);
-		
-		File tempFile = null;
-		DataOutputStream daos = null;
-		DataInputStream dais = null;
-		
-		try {
-			tempFile = new File(tmpChannel.getPath());
-			
-			FileOutputStream fos = new FileOutputStream(tempFile);
-			daos = new DataOutputStream(new BufferedOutputStream(fos, bufferSize));
-			
-			long writeStart = System.currentTimeMillis();
-			
-			int valsLeft = NUM_INTS_WRITTEN;
-			while (valsLeft-- > 0) {
-				rec.setValue(valsLeft);
-				rec.write(new OutputViewDataOutputStreamWrapper(daos));
-			}
-			daos.close();
-			daos = null;
-			
-			long writeElapsed = System.currentTimeMillis() - writeStart;
-			
-			// ----------------------------------------------------------------
-			
-			FileInputStream fis = new FileInputStream(tempFile);
-			dais = new DataInputStream(new BufferedInputStream(fis, bufferSize));
-			
-			long readStart = System.currentTimeMillis();
-			
-			valsLeft = NUM_INTS_WRITTEN;
-			while (valsLeft-- > 0) {
-				rec.read(new InputViewDataInputStreamWrapper(dais));
-			}
-			dais.close();
-			dais = null;
-			
-			long readElapsed = System.currentTimeMillis() - readStart;
-			
-			LOG.info("File-Stream with buffer " + bufferSize + ": write " + writeElapsed + " msecs,
read " + readElapsed + " msecs.");
-		}
-		finally {
-			// close if possible
-			if (daos != null) {
-				daos.close();
-			}
-			if (dais != null) {
-				dais.close();
-			}
-			// try to delete the file
-			if (tempFile != null) {
-				tempFile.delete();
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void speedTestNIO() throws Exception
-	{
-		LOG.info("Starting speed test with java NIO heap buffers and ALIGNED buffer sizes ...");
-		
-		for (int bufferSize : SEGMENT_SIZES_ALIGNED)
-		{
-			speedTestNIO(bufferSize, false);
-		}
-		
-		LOG.info("Starting speed test with java NIO heap buffers and UNALIGNED buffer sizes ...");
-		
-		for (int bufferSize : SEGMENT_SIZES_UNALIGNED)
-		{
-			speedTestNIO(bufferSize, false);
-		}
-		
-		LOG.info("Starting speed test with java NIO direct buffers and ALIGNED buffer sizes ...");
-		
-		for (int bufferSize : SEGMENT_SIZES_ALIGNED)
-		{
-			speedTestNIO(bufferSize, true);
-		}
-		
-		LOG.info("Starting speed test with java NIO direct buffers and UNALIGNED buffer sizes ...");
-		
-		for (int bufferSize : SEGMENT_SIZES_UNALIGNED)
-		{
-			speedTestNIO(bufferSize, true);
-		}
-		
-	}
-		
-	@SuppressWarnings("resource")
-	private void speedTestNIO(int bufferSize, boolean direct) throws IOException
-	{
-		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
-		
-		File tempFile = null;
-		FileChannel fs = null;
-		
-		try {
-			tempFile = new File(tmpChannel.getPath());
-			
-			RandomAccessFile raf = new RandomAccessFile(tempFile, "rw");
-			fs = raf.getChannel();
-			
-			ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
-			
-			long writeStart = System.currentTimeMillis();
-			
-			int valsLeft = NUM_INTS_WRITTEN;
-			while (valsLeft-- > 0) {
-				if (buf.remaining() < 4) {
-					buf.flip();
-					fs.write(buf);
-					buf.clear();
-				}
-				buf.putInt(valsLeft);
-			}
-			
-			if (buf.position() > 0) {
-				buf.flip();
-				fs.write(buf);
-			}
-			
-			fs.close();
-			raf.close();
-			fs = null;
-			
-			long writeElapsed = System.currentTimeMillis() - writeStart;
-			
-			// ----------------------------------------------------------------
-			
-			raf = new RandomAccessFile(tempFile, "r");
-			fs = raf.getChannel();
-			buf.clear();
-			
-			long readStart = System.currentTimeMillis();
-			
-			fs.read(buf);
-			buf.flip();
-			
-			valsLeft = NUM_INTS_WRITTEN;
-			while (valsLeft-- > 0) {
-				if (buf.remaining() < 4) {
-					buf.compact();
-					fs.read(buf);
-					buf.flip();
-				}
-				if (buf.getInt() != valsLeft) {
-					throw new IOException();
-				}
-			}
-			
-			fs.close();
-			raf.close();
-			
-			long readElapsed = System.currentTimeMillis() - readStart;
-			
-			LOG.info("NIO Channel with buffer " + bufferSize + ": write " + writeElapsed + " msecs,
read " + readElapsed + " msecs.");
-		}
-		finally {
-			// close if possible
-			if (fs != null) {
-				fs.close();
-				fs = null;
-			}
-			// try to delete the file
-			if (tempFile != null) {
-				tempFile.delete();
-			}
-		}
-	}
-
-}


Mime
View raw message