flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [11/34] Offer buffer-oriented API for I/O (#25)
Date Tue, 10 Jun 2014 19:35:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/fs/s3/S3FileSystemTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/fs/s3/S3FileSystemTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/fs/s3/S3FileSystemTest.java
deleted file mode 100644
index 784178d..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/fs/s3/S3FileSystemTest.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.fs.s3;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.core.fs.BlockLocation;
-import eu.stratosphere.core.fs.FSDataInputStream;
-import eu.stratosphere.core.fs.FSDataOutputStream;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.runtime.fs.s3.S3FileSystem;
-
-/**
- * This test checks the S3 implementation of the {@link FileSystem} interface.
- * 
- */
-public class S3FileSystemTest {
-
-	/**
-	 * The length of the bucket/object names used in this test.
-	 */
-	private static final int NAME_LENGTH = 32;
-
-	/**
-	 * The alphabet to generate the random bucket/object names from.
-	 */
-	private static final char[] ALPHABET = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o',
-		'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };
-
-	/**
-	 * The size of the byte buffer used during the tests in bytes.
-	 */
-	private static final int TEST_BUFFER_SIZE = 128;
-
-	/**
-	 * The size of the small test file in bytes.
-	 */
-	private static final int SMALL_FILE_SIZE = 512;
-
-	/**
-	 * The size of the large test file in bytes.
-	 */
-	private static final int LARGE_FILE_SIZE = 1024 * 1024 * 12; // 12 MB
-
-	/**
-	 * The modulus to be used when generating the test data. Must not be larger than 128.
-	 */
-	private static final int MODULUS = 128;
-
-	private static final String S3_BASE_URI = "s3:///";
-
-	/**
-	 * Tries to read the AWS access key and the AWS secret key from the environments variables. If accessing these keys
-	 * fails, all tests will be skipped and marked as successful.
-	 */
-	@Before
-	public void initKeys() {
-		final String accessKey = System.getenv("AK");
-		final String secretKey = System.getenv("SK");
-		
-		if (accessKey != null || secretKey != null) {
-			Configuration conf = new Configuration();
-			if (accessKey != null) {
-				conf.setString(S3FileSystem.S3_ACCESS_KEY_KEY, accessKey);
-			}
-			if (secretKey != null) {
-				conf.setString(S3FileSystem.S3_SECRET_KEY_KEY, secretKey);
-			}
-			GlobalConfiguration.includeConfiguration(conf);
-		}
-	}
-
-	/**
-	 * This test creates and deletes a bucket inside S3 and checks it is correctly displayed inside the directory
-	 * listing.
-	 */
-	@Test
-	public void createAndDeleteBucketTest() {
-
-		if (!testActivated()) {
-			return;
-		}
-
-		final String bucketName = getRandomName();
-		final Path bucketPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR);
-
-		try {
-
-			final FileSystem fs = bucketPath.getFileSystem();
-
-			// Create directory
-			fs.mkdirs(bucketPath);
-
-			// Check if directory is correctly displayed in file system hierarchy
-			final FileStatus[] content = fs.listStatus(new Path(S3_BASE_URI));
-			boolean entryFound = false;
-			for (final FileStatus entry : content) {
-				if (bucketPath.equals(entry.getPath())) {
-					entryFound = true;
-					break;
-				}
-			}
-
-			if (!entryFound) {
-				fail("Cannot find entry " + bucketName + " in directory " + S3_BASE_URI);
-			}
-
-			// Check the concrete directory file status
-			try {
-				final FileStatus directoryFileStatus = fs.getFileStatus(bucketPath);
-				assertTrue(directoryFileStatus.isDir());
-				assertEquals(0L, directoryFileStatus.getAccessTime());
-				assertTrue(directoryFileStatus.getModificationTime() > 0L);
-
-			} catch (FileNotFoundException e) {
-				fail(e.getMessage());
-			}
-
-			// Delete the bucket
-			fs.delete(bucketPath, true);
-
-			// Make sure the bucket no longer exists
-			try {
-				fs.getFileStatus(bucketPath);
-				fail("Expected FileNotFoundException for " + bucketPath.toUri());
-			} catch (FileNotFoundException e) {
-				// This is an expected exception
-			}
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	/**
-	 * Creates and reads the a larger test file in S3. The test file is generated according to a specific pattern.
-	 * During the read phase the incoming data stream is also checked against this pattern.
-	 */
-	@Test
-	public void createAndReadLargeFileTest() {
-
-		try {
-			createAndReadFileTest(LARGE_FILE_SIZE);
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	/**
-	 * Creates and reads the a small test file in S3. The test file is generated according to a specific pattern.
-	 * During the read phase the incoming data stream is also checked against this pattern.
-	 */
-	@Test
-	public void createAndReadSmallFileTest() {
-
-		try {
-			createAndReadFileTest(SMALL_FILE_SIZE);
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	/**
-	 * The tests checks the mapping of the file system directory structure to the underlying bucket/object model of
-	 * Amazon S3.
-	 */
-	@Test
-	public void multiLevelDirectoryTest() {
-
-		if (!testActivated()) {
-			return;
-		}
-
-		final String dirName = getRandomName();
-		final String subdirName = getRandomName();
-		final String subsubdirName = getRandomName();
-		final String fileName = getRandomName();
-		final Path dir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR);
-		final Path subdir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR);
-		final Path subsubdir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR
-			+ subsubdirName + Path.SEPARATOR);
-		final Path file = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR + fileName);
-
-		try {
-
-			final FileSystem fs = dir.getFileSystem();
-
-			fs.mkdirs(subsubdir);
-
-			final OutputStream os = fs.create(file, true);
-			generateTestData(os, SMALL_FILE_SIZE);
-			os.close();
-
-			// On this directory levels there should only be one subdirectory
-			FileStatus[] list = fs.listStatus(dir);
-			int numberOfDirs = 0;
-			int numberOfFiles = 0;
-			for (final FileStatus entry : list) {
-
-				if (entry.isDir()) {
-					++numberOfDirs;
-					assertEquals(subdir, entry.getPath());
-				} else {
-					fail(entry.getPath() + " is a file which must not appear on this directory level");
-				}
-			}
-
-			assertEquals(1, numberOfDirs);
-			assertEquals(0, numberOfFiles);
-
-			list = fs.listStatus(subdir);
-			numberOfDirs = 0;
-
-			for (final FileStatus entry : list) {
-				if (entry.isDir()) {
-					assertEquals(subsubdir, entry.getPath());
-					++numberOfDirs;
-				} else {
-					assertEquals(file, entry.getPath());
-					++numberOfFiles;
-				}
-			}
-
-			assertEquals(1, numberOfDirs);
-			assertEquals(1, numberOfFiles);
-
-			fs.delete(dir, true);
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	/**
-	 * This test checks the S3 implementation of the file system method to retrieve the block locations of a file.
-	 */
-	@Test
-	public void blockLocationTest() {
-
-		if (!testActivated()) {
-			return;
-		}
-
-		final String dirName = getRandomName();
-		final String fileName = getRandomName();
-		final Path dir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR);
-		final Path file = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + fileName);
-
-		try {
-
-			final FileSystem fs = dir.getFileSystem();
-
-			fs.mkdirs(dir);
-
-			final OutputStream os = fs.create(file, true);
-			generateTestData(os, SMALL_FILE_SIZE);
-			os.close();
-
-			final FileStatus fileStatus = fs.getFileStatus(file);
-			assertNotNull(fileStatus);
-
-			BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 0, SMALL_FILE_SIZE + 1);
-			assertNull(blockLocations);
-
-			blockLocations = fs.getFileBlockLocations(fileStatus, 0, SMALL_FILE_SIZE);
-			assertEquals(1, blockLocations.length);
-
-			final BlockLocation bl = blockLocations[0];
-			assertNotNull(bl.getHosts());
-			assertEquals(1, bl.getHosts().length);
-			assertEquals(SMALL_FILE_SIZE, bl.getLength());
-			assertEquals(0, bl.getOffset());
-			final URI s3Uri = fs.getUri();
-			assertNotNull(s3Uri);
-			assertEquals(s3Uri.getHost(), bl.getHosts()[0]);
-
-			fs.delete(dir, true);
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	/**
-	 * Creates and reads a file with the given size in S3. The test file is generated according to a specific pattern.
-	 * During the read phase the incoming data stream is also checked against this pattern.
-	 * 
-	 * @param fileSize
-	 *        the size of the file to be generated in bytes
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while writing or reading the test file
-	 */
-	private void createAndReadFileTest(final int fileSize) throws IOException {
-
-		if (!testActivated()) {
-			return;
-		}
-
-		final String bucketName = getRandomName();
-		final String objectName = getRandomName();
-		final Path bucketPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR);
-		final Path objectPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR + objectName);
-
-		FileSystem fs = bucketPath.getFileSystem();
-
-		// Create test bucket
-		fs.mkdirs(bucketPath);
-
-		// Write test file to S3
-		final FSDataOutputStream outputStream = fs.create(objectPath, false);
-		generateTestData(outputStream, fileSize);
-		outputStream.close();
-
-		// Now read the same file back from S3
-		final FSDataInputStream inputStream = fs.open(objectPath);
-		testReceivedData(inputStream, fileSize);
-		inputStream.close();
-
-		// Delete test bucket
-		fs.delete(bucketPath, true);
-	}
-
-	/**
-	 * Receives test data from the given input stream and checks the size of the data as well as the pattern inside the
-	 * received data.
-	 * 
-	 * @param inputStream
-	 *        the input stream to read the test data from
-	 * @param expectedSize
-	 *        the expected size of the data to be read from the input stream in bytes
-	 * @throws IOException
-	 *         thrown if an error occurs while reading the data
-	 */
-	private void testReceivedData(final InputStream inputStream, final int expectedSize) throws IOException {
-
-		final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
-
-		int totalBytesRead = 0;
-		int nextExpectedNumber = 0;
-		while (true) {
-
-			final int bytesRead = inputStream.read(testBuffer);
-			if (bytesRead < 0) {
-				break;
-			}
-
-			totalBytesRead += bytesRead;
-
-			for (int i = 0; i < bytesRead; ++i) {
-				if (testBuffer[i] != nextExpectedNumber) {
-					throw new IOException("Read number " + testBuffer[i] + " but expected " + nextExpectedNumber);
-				}
-
-				++nextExpectedNumber;
-
-				if (nextExpectedNumber == MODULUS) {
-					nextExpectedNumber = 0;
-				}
-			}
-		}
-
-		if (totalBytesRead != expectedSize) {
-			throw new IOException("Expected to read " + expectedSize + " bytes but only received " + totalBytesRead);
-		}
-	}
-
-	/**
-	 * Generates test data of the given size according to some specific pattern and writes it to the provided output
-	 * stream.
-	 * 
-	 * @param outputStream
-	 *        the output stream to write the data to
-	 * @param size
-	 *        the size of the test data to be generated in bytes
-	 * @throws IOException
-	 *         thrown if an error occurs while writing the data
-	 */
-	private void generateTestData(final OutputStream outputStream, final int size) throws IOException {
-
-		final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
-		for (int i = 0; i < testBuffer.length; ++i) {
-			testBuffer[i] = (byte) (i % MODULUS);
-		}
-
-		int bytesWritten = 0;
-		while (bytesWritten < size) {
-
-			final int diff = size - bytesWritten;
-			if (diff < testBuffer.length) {
-				outputStream.write(testBuffer, 0, diff);
-				bytesWritten += diff;
-			} else {
-				outputStream.write(testBuffer);
-				bytesWritten += testBuffer.length;
-			}
-		}
-	}
-
-	/**
-	 * Generates a random name.
-	 * 
-	 * @return a random name
-	 */
-	private String getRandomName() {
-
-		final StringBuilder stringBuilder = new StringBuilder();
-		for (int i = 0; i < NAME_LENGTH; ++i) {
-			final char c = ALPHABET[(int) (Math.random() * (double) ALPHABET.length)];
-			stringBuilder.append(c);
-		}
-
-		return stringBuilder.toString();
-	}
-
-	/**
-	 * Checks whether the AWS access key and the AWS secret keys have been successfully loaded from the configuration
-	 * and whether the S3 tests shall be performed.
-	 * 
-	 * @return <code>true</code> if the tests shall be performed, <code>false</code> if the tests shall be skipped
-	 *         because at least one AWS key is missing
-	 */
-	private boolean testActivated() {
-
-		final String accessKey = GlobalConfiguration.getString(S3FileSystem.S3_ACCESS_KEY_KEY, null);
-		final String secretKey = GlobalConfiguration.getString(S3FileSystem.S3_SECRET_KEY_KEY, null);
-
-		if (accessKey != null && secretKey != null) {
-			return true;
-		}
-
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/AbstractIDTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/AbstractIDTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/AbstractIDTest.java
deleted file mode 100644
index 2130f5b..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/AbstractIDTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-
-import org.junit.Test;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.util.CommonTestUtils;
-
-/**
- * This class contains tests for the {@link AbstractID} class.
- * 
- */
-public class AbstractIDTest {
-
-	/**
-	 * Tests the setID method of an abstract ID.
-	 */
-	@Test
-	public void testSetID() {
-
-		final ChannelID id1 = new ChannelID();
-		final ChannelID id2 = new ChannelID();
-		id1.setID(id2);
-
-		assertEquals(id1.hashCode(), id2.hashCode());
-		assertEquals(id1, id2);
-	}
-
-	/**
-	 * Tests the serialization/deserialization of an abstract ID.
-	 */
-	@Test
-	public void testSerialization() {
-
-		final ChannelID origID = new ChannelID();
-		try {
-			final ChannelID copyID = (ChannelID) CommonTestUtils.createCopy(origID);
-
-			assertEquals(origID.hashCode(), copyID.hashCode());
-			assertEquals(origID, copyID);
-
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/DefaultChannelSelectorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/DefaultChannelSelectorTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/DefaultChannelSelectorTest.java
deleted file mode 100644
index 387e49f..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/DefaultChannelSelectorTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-import eu.stratosphere.core.io.StringRecord;
-
-/**
- * This class checks the functionality of the {@link DefaultChannelSelector} class.
- * 
- */
-public class DefaultChannelSelectorTest {
-
-	/**
-	 * This test checks the channel selection
-	 */
-	@Test
-	public void channelSelect() {
-
-		final StringRecord dummyRecord = new StringRecord("abc");
-		final DefaultChannelSelector<StringRecord> selector = new DefaultChannelSelector<StringRecord>();
-		// Test with two channels
-		final int numberOfOutputChannels = 2;
-		int[] selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels);
-		assertEquals(1, selectedChannels.length);
-		assertEquals(1, selectedChannels[0]);
-		selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels);
-		assertEquals(1, selectedChannels.length);
-		assertEquals(0, selectedChannels[0]);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/BufferTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/BufferTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/BufferTest.java
deleted file mode 100644
index 13cc30e..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/BufferTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.types.IntegerRecord;
-
-/**
- * This class checks the functionality of the {@link SerializationBuffer} class and the {@link DefaultDeserializer}
- * class
- * 
- */
-public class BufferTest
-{
-	private File file = new File("./tmp");
-
-	private FileInputStream fileinstream;
-
-	private FileOutputStream filestream;
-
-	private FileChannel writeable;
-
-	private FileChannel readable;
-
-	/**
-	 * Set up files and stream for testing
-	 * 
-	 * @throws IOException
-	 */
-	@Before
-	public void before() throws IOException {
-		file.createNewFile();
-		filestream = new FileOutputStream(file);
-		fileinstream = new FileInputStream(file);
-		writeable = filestream.getChannel();
-		readable = fileinstream.getChannel();
-
-	}
-
-	/**
-	 * clean up. Remove file close streams and channels
-	 * 
-	 * @throws IOException
-	 */
-	@After
-	public void after() throws IOException {
-		fileinstream.close();
-		writeable.close();
-		readable.close();
-		filestream.close();
-		file.delete();
-	}
-
-	/**
-	 * Tests serialization and deserialization of an {@link IntegerRecord}
-	 */
-	@Test
-	public void testIntSerialize()
-	{
-		final SerializationBuffer<IntegerRecord> intSerializationBuffer = new SerializationBuffer<IntegerRecord>();
-		final int NUM = 0xab627ef;
-		
-		IntegerRecord intRecord = new IntegerRecord(NUM);
-		// Serialize a record.
-		try {
-			intSerializationBuffer.serialize(intRecord);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail();
-		}
-		// Last record is still in buffer, serializing another should throw IOException
-		try {
-			intSerializationBuffer.serialize(intRecord);
-			fail();
-		} catch (IOException e) {
-		}
-		
-		// Read from buffer (written in file)
-		try {
-			intSerializationBuffer.read(writeable);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail();
-		}
-		// Now a new Record can be serialized
-		try {
-			intSerializationBuffer.serialize(intRecord);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail();
-		}
-
-		DefaultDeserializer<IntegerRecord> intDeserialitionBuffer = new DefaultDeserializer<IntegerRecord>(IntegerRecord.class, true);
-		IntegerRecord record = new IntegerRecord();
-		// Deserialize a Record
-		try {
-			record = intDeserialitionBuffer.readData(record, readable);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail();
-		}
-		// Check it contains the right value
-		assertEquals(NUM, record.getValue());
-		// File empty, another read should throw IOException
-		try {
-			record = intDeserialitionBuffer.readData(record, readable);
-			fail();
-		} catch (IOException e) {
-		}
-
-	}
-
-	/**
-	 * Tests serialization and deserialization of an {@link StringRecord}
-	 */
-	@Test
-	public void testStringSerialize()
-	{
-		final SerializationBuffer<StringRecord> stringSerializationBuffer = new SerializationBuffer<StringRecord>();
-		final String str = "abc";
-		
-		StringRecord stringrecord = new StringRecord(str);
-		
-		// Serialize a record.
-		try {
-			stringSerializationBuffer.serialize(stringrecord);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail();
-		}
-		// Read from buffer (write in file)
-		try {
-			stringSerializationBuffer.read(writeable);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail();
-		}
-		// Serialize next Record.
-		// Read from buffer (write in file)
-		final String str2 = "abcdef";
-		stringrecord = new StringRecord(str2);
-		try {
-			stringSerializationBuffer.serialize(stringrecord);
-			stringSerializationBuffer.read(writeable);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail();
-		}
-
-		final DefaultDeserializer<StringRecord> stringDeserialitionBuffer = new DefaultDeserializer<StringRecord>(StringRecord.class, true);
-		StringRecord record = new StringRecord();
-		// Deserialize and check record are correct
-		try {
-			record = stringDeserialitionBuffer.readData(record, readable);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail();
-		}
-		assertEquals(str, record.toString());
-		try {
-			record = stringDeserialitionBuffer.readData(record, readable);
-		} catch (IOException e) {
-			e.printStackTrace();
-			fail();
-		}
-		assertEquals(str2, record.toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/MemoryBufferTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/MemoryBufferTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/MemoryBufferTest.java
deleted file mode 100644
index d8ffc27..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/MemoryBufferTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.util.BufferPoolConnector;
-
-
-public class MemoryBufferTest {
-
-	private MemoryBufferPoolConnector bufferPoolConnector;
-	private Queue<MemorySegment> bufferPool;
-	
-	private final static int INT_COUNT = 512;
-	private final static int INT_SIZE = Integer.SIZE / Byte.SIZE;
-
-	@Before
-	public void setUp() throws Exception {
-		bufferPool = new LinkedBlockingQueue<MemorySegment>();
-		bufferPoolConnector = new BufferPoolConnector(bufferPool);
-	}
-
-	@After
-	public void tearDown() throws Exception {
-	}
-
-	@Test
-	public void readToSmallByteBuffer() throws IOException {
-		MemoryBuffer buf = new MemoryBuffer(INT_COUNT*INT_SIZE, new MemorySegment(new byte[INT_COUNT*INT_SIZE]), bufferPoolConnector);
-		fillBuffer(buf);
-		
-		ByteBuffer target = ByteBuffer.allocate(INT_SIZE);
-		ByteBuffer largeTarget = ByteBuffer.allocate(INT_COUNT*INT_SIZE);
-		int i = 0;
-		while(buf.hasRemaining()) {
-			buf.read(target);
-			target.rewind();
-			largeTarget.put(target);
-			target.rewind();
-			if( i++ >= INT_COUNT) {
-				fail("There were too many elements in the buffer");
-			}
-		}
-		assertEquals(-1, buf.read(target));
-		
-		target.rewind();
-		validateByteBuffer(largeTarget);
-	}
-		
-	
-	/**
-	 * CopyToBuffer uses system.arraycopy()
-	 * 
-	 * @throws IOException
-	 */
-	@Test
-	public void copyToBufferTest() throws IOException {
-
-		MemoryBuffer buf = new MemoryBuffer(INT_COUNT*INT_SIZE, new MemorySegment(new byte[INT_COUNT*INT_SIZE]), bufferPoolConnector);
-		fillBuffer(buf);
-		
-		
-		// the target buffer is larger to check if the limit is set appropriately
-		MemoryBuffer destination = new MemoryBuffer(INT_COUNT*INT_SIZE*2, 
-					new MemorySegment(new byte[INT_COUNT*INT_SIZE*2]), 
-					bufferPoolConnector);
-		assertEquals(INT_COUNT*INT_SIZE*2, destination.limit());
-		// copy buf contents to double sized MemBuffer
-		buf.copyToBuffer(destination);
-		assertEquals(INT_COUNT*INT_SIZE, destination.limit());
-		
-		// copy contents of destination to byteBuffer
-		ByteBuffer test = ByteBuffer.allocate(INT_COUNT*INT_SIZE);
-		int written = destination.read(test);
-		assertEquals(INT_COUNT*INT_SIZE, written);
-		// validate byteBuffer contents
-		validateByteBuffer(test);
-		
-		destination.position(written);
-		destination.limit(destination.getTotalSize());
-		// allocate another byte buffer to write the rest of destination into a byteBuffer
-		ByteBuffer testRemainder = ByteBuffer.allocate(INT_COUNT*INT_SIZE);
-		written = destination.read(testRemainder);
-		assertEquals(INT_COUNT*INT_SIZE, written);
-		expectAllNullByteBuffer(testRemainder);
-		
-		buf.close(); // make eclipse happy
-	}
-	
-	@Test
-	public void testDuplicate() throws Exception {
-		MemoryBuffer buf = new MemoryBuffer(INT_COUNT*INT_SIZE, new MemorySegment(new byte[INT_COUNT*INT_SIZE]), bufferPoolConnector);
-		MemoryBuffer buf2 = buf.duplicate();
-		
-		buf2.close();
-		buf.close();
-	}
-
-	private void fillBuffer(Buffer buf) throws IOException {
-		ByteBuffer src = ByteBuffer.allocate(INT_SIZE);
-		// write some data into buf:
-		for(int i = 0; i < INT_COUNT; ++i) {
-			src.putInt(0,i);
-			src.rewind();
-			buf.write(src);
-		}
-		buf.flip();
-	}
-	
-	
-	/**
-	 * Validates if the ByteBuffer contains the what fillMemoryBuffer has written!
-	 * 
-	 * @param target
-	 */
-	private void validateByteBuffer(ByteBuffer target) {
-		ByteBuffer ref = ByteBuffer.allocate(INT_SIZE);
-		
-		for(int i = 0; i < INT_SIZE*INT_COUNT; ++i) {
-			ref.putInt(0,i / INT_SIZE);
-			assertEquals("Byte at position "+i+" is different", ref.get(i%INT_SIZE), target.get(i));
-		}
-	}
-	
-	private void expectAllNullByteBuffer(ByteBuffer target) {
-		ByteBuffer ref = ByteBuffer.allocate(INT_SIZE);
-		ref.putInt(0,0);
-		for(int i = 0; i < INT_COUNT; ++i) {
-			assertEquals("Byte at position "+i+" is different", ref.getInt(0), target.getInt(i));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/BooleanType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/BooleanType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/BooleanType.java
deleted file mode 100644
index 749a702..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/BooleanType.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class BooleanType implements SerializationTestType
-{
-	private boolean value;
-	
-
-	public BooleanType()
-	{
-		this.value = false;
-	}
-	
-	private BooleanType(boolean value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public BooleanType getRandom(Random rnd)
-	{
-		return new BooleanType(rnd.nextBoolean());
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeBoolean(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readBoolean();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return this.value ? 1 : 0;
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof BooleanType) {
-			BooleanType other = (BooleanType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteArrayType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteArrayType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteArrayType.java
deleted file mode 100644
index 9c5bdca..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteArrayType.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-
-/**
- */
-public class ByteArrayType implements SerializationTestType
-{
-	private static final int MAX_LEN = 512 * 15;
-	
-	private byte[] data;
-	
-
-	public ByteArrayType()
-	{
-		this.data = new byte[0];
-	}
-	
-	private ByteArrayType(byte[] data)
-	{
-		this.data = data;
-	}
-	
-
-	@Override
-	public ByteArrayType getRandom(Random rnd)
-	{
-		final int len = rnd.nextInt(MAX_LEN) + 1;
-		final byte[] data = new byte[len];
-		rnd.nextBytes(data);
-		return new ByteArrayType(data);
-	}
-	
-	
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeInt(this.data.length);
-		out.write(this.data);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		final int len = in.readInt();
-		this.data = new byte[len];
-		in.readFully(this.data);
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return Arrays.hashCode(this.data);
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof ByteArrayType) {
-			ByteArrayType other = (ByteArrayType) obj;
-			return Arrays.equals(this.data, other.data);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteSubArrayType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteSubArrayType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteSubArrayType.java
deleted file mode 100644
index 719bd0d..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteSubArrayType.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-
-/**
- */
-public class ByteSubArrayType implements SerializationTestType
-{
-	private static final int MAX_LEN = 512;
-
-	private final byte[] data;
-	private int len;
-	
-	public ByteSubArrayType()
-	{
-		this.data = new byte[MAX_LEN];
-		this.len = 0;
-	}
-	
-
-	@Override
-	public ByteSubArrayType getRandom(Random rnd)
-	{
-		final int len = rnd.nextInt(MAX_LEN) + 1;
-		final ByteSubArrayType t = new ByteSubArrayType();
-		t.len = len;
-		
-		final byte[] data = t.data;
-		for (int i = 0; i < len; i++) {
-			data[i] = (byte) rnd.nextInt(256);
-		}
-		
-		return t;
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeInt(this.len);
-		out.write(this.data, 0, this.len);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.len = in.readInt();
-		in.readFully(this.data, 0, this.len);
-	}
-	
-
-	@Override
-	public int hashCode()
-	{
-		final byte[] copy = new byte[this.len];
-		System.arraycopy(this.data, 0, copy, 0, this.len);
-		return Arrays.hashCode(copy);
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof ByteSubArrayType) {
-			ByteSubArrayType other = (ByteSubArrayType) obj;
-			if (this.len == other.len) {
-				for (int i = 0; i < this.len; i++) {
-					if (this.data[i] != other.data[i]) {
-						return false;
-					}
-				}
-				return true;
-			} else {
-				return false;
-			}
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteType.java
deleted file mode 100644
index 141f7b3..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ByteType.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class ByteType implements SerializationTestType
-{
-	private byte value;
-	
-
-	public ByteType()
-	{
-		this.value = (byte) 0;
-	}
-	
-	private ByteType(byte value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public ByteType getRandom(Random rnd)
-	{
-		return new ByteType((byte) rnd.nextInt(256));
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeByte(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readByte();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return this.value;
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof ByteType) {
-			ByteType other = (ByteType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/CharType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/CharType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/CharType.java
deleted file mode 100644
index e51aeb8..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/CharType.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class CharType implements SerializationTestType
-{
-	private char value;
-	
-
-	public CharType()
-	{
-		this.value = 0;
-	}
-	
-	private CharType(char value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public CharType getRandom(Random rnd)
-	{
-		return new CharType((char) rnd.nextInt(10000));
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeChar(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readChar();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return this.value;
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof CharType) {
-			CharType other = (CharType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/DeSerializerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/DeSerializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/DeSerializerTest.java
deleted file mode 100644
index 1c42277..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/DeSerializerTest.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Random;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import eu.stratosphere.nephele.io.channels.DefaultDeserializer;
-import eu.stratosphere.nephele.io.channels.SerializationBuffer;
-
-
-/**
- */
-public class DeSerializerTest
-{
-	private static final SerializationTestType[] TYPE_FACTORIES = new SerializationTestType[] {
-		new BooleanType(),
-		new ByteArrayType(),
-		new ByteSubArrayType(),
-		new ByteType(),
-		new CharType(),
-		new DoubleType(),
-		new FloatType(),
-		new IntType(),
-		new LongType(),
-		new ShortType(),
-		new UnsignedByteType(),
-		new UnsignedShortType(),
-		new UTFStringType()
-	};
-	
-	private static final long SEED = 64871654635745873L;
-	
-	private Random rnd;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Before
-	public void setup()
-	{
-		this.rnd = new Random(SEED);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Test
-	public void testSequenceOfIntegersWithAlignedBuffers()
-	{
-		try {
-			final Random rnd = this.rnd;
-			final int NUM_INTS = 1000000;
-			
-			final Iterator<SerializationTestType> intSource = new Iterator<SerializationTestType>()
-			{
-				private final Random random = rnd;
-				private final int limit = NUM_INTS;
-				private int pos = 0;
-				
-				@Override
-				public boolean hasNext() {
-					return this.pos < this.limit;
-				}
-				@Override
-				public IntType next() {
-					if (hasNext()) {
-						this.pos++;
-						return new IntType(this.random.nextInt());
-					} else {
-						throw new NoSuchElementException();
-					}
-				}
-				@Override
-				public void remove() {
-					throw new UnsupportedOperationException();
-				}
-			};
-			
-			testSequenceOfTypes(intSource, 2048);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testSequenceOfIntegersWithUnalignedBuffers()
-	{
-		try {
-			final Random rnd = this.rnd;
-			final int NUM_INTS = 1000000;
-			
-			final Iterator<SerializationTestType> intSource = new Iterator<SerializationTestType>()
-			{
-				private final Random random = rnd;
-				private final int limit = NUM_INTS;
-				private int pos = 0;
-				
-				@Override
-				public boolean hasNext() {
-					return this.pos < this.limit;
-				}
-				@Override
-				public IntType next() {
-					if (hasNext()) {
-						this.pos++;
-						return new IntType(this.random.nextInt());
-					} else {
-						throw new NoSuchElementException();
-					}
-				}
-				@Override
-				public void remove() {
-					throw new UnsupportedOperationException();
-				}
-			};
-			
-			testSequenceOfTypes(intSource, 2047);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testRandomTypes()
-	{
-		try {
-			final Random rnd = this.rnd;
-			final int NUM_TYPES = 1000000;
-			
-			final Iterator<SerializationTestType> randomSource = new Iterator<SerializationTestType>()
-			{
-				private final Random random = rnd;
-				private final int limit = NUM_TYPES;
-				private int pos = 0;
-				
-				@Override
-				public boolean hasNext() {
-					return this.pos < this.limit;
-				}
-				@Override
-				public SerializationTestType next() {
-					if (hasNext()) {
-						this.pos++;
-						return TYPE_FACTORIES[this.random.nextInt(TYPE_FACTORIES.length)].getRandom(this.random);
-					} else {
-						throw new NoSuchElementException();
-					}
-				}
-				@Override
-				public void remove() {
-					throw new UnsupportedOperationException();
-				}
-			};
-			
-			// test with an odd buffer size to force many unaligned cases
-			testSequenceOfTypes(randomSource, 512 * 7);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Test encountered an unexpected exception.");
-		}
-	}
-	
-	private static final void testSequenceOfTypes(Iterator<SerializationTestType> sequence, int bufferSize) throws Exception
-	{
-		final ArrayDeque<SerializationTestType> elements = new ArrayDeque<SerializationTestType>(512);
-		
-		final PipeChannel channel = new PipeChannel(bufferSize);
-		final SerializationBuffer<SerializationTestType> serBuffer = new SerializationBuffer<SerializationTestType>();
-		final DefaultDeserializer<SerializationTestType> deserBuffer = new DefaultDeserializer<SerializationTestType>(null);
-		
-		while (sequence.hasNext()) {
-			final SerializationTestType type = sequence.next();
-			
-			// serialize the record
-			serBuffer.serialize(type);
-			elements.addLast(type);
-			
-			// write the serialized record
-			while (true) {
-				serBuffer.read(channel);
-				if (serBuffer.dataLeftFromPreviousSerialization()) {
-					// current buffer is full, we need to start de-serializing to make space
-					channel.flip();
-					
-					while (!elements.isEmpty()) {
-						final SerializationTestType reference = elements.pollFirst();
-						final SerializationTestType result = deserBuffer.readData(reference.getClass().newInstance(), channel);
-						if (result == null) {
-							// not yet complete, we need to break
-							elements.addFirst(reference);
-							break;
-						} else {
-							// validate that we deserialized correctly
-							assertEquals("The deserialized element is not equal to the serialized element.", reference, result);
-						}
-					}
-					
-					channel.clear();
-				} else {
-					break;
-				}
-			}
-		}
-		
-		// check the remaining records in the buffers...
-		channel.flip();
-		while (!elements.isEmpty()) {
-			final SerializationTestType reference = elements.pollFirst();
-			final SerializationTestType result = deserBuffer.readData(reference.getClass().newInstance(), channel);
-			
-			assertNotNull(result);
-			assertEquals("The deserialized element is not equal to the serialized element.", reference, result);
-		}
-	}
-	
-	// ============================================================================================
-	
-	private static final class PipeChannel implements WritableByteChannel, ReadableByteChannel
-	{
-		private final byte[] buffer;
-		
-		private int position;
-		private int limit;
-		
-		
-		PipeChannel(int capacity) {
-			this.buffer = new byte[capacity];
-			this.limit = capacity;
-		}
-		
-		public void flip() {
-			this.limit = this.position;
-			this.position = 0;
-		}
-		
-		public void clear() {
-			this.position = 0;
-			this.limit = this.buffer.length;
-		}
-		
-		@Override
-		public boolean isOpen() {
-			return true;
-		}
-
-		@Override
-		public void close()
-		{}
-
-
-		@Override
-		public int write(ByteBuffer src)
-		{
-			final int toGet = Math.min(this.limit - this.position, src.remaining());
-			src.get(this.buffer, this.position, toGet);
-			this.position += toGet;
-			return toGet;
-		}
-
-
-		@Override
-		public int read(ByteBuffer dst) throws IOException
-		{
-			final int toPut = Math.min(this.limit - this.position, dst.remaining());
-			dst.put(this.buffer, this.position, toPut);
-			this.position += toPut;
-			return toPut;
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/DoubleType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/DoubleType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/DoubleType.java
deleted file mode 100644
index 3f3e958..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/DoubleType.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class DoubleType implements SerializationTestType
-{
-	private double value;
-	
-
-	public DoubleType()
-	{
-		this.value = 0;
-	}
-	
-	private DoubleType(double value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public DoubleType getRandom(Random rnd)
-	{
-		return new DoubleType(rnd.nextDouble());
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeDouble(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readDouble();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		final long l = Double.doubleToLongBits(this.value);
-		return (int) (l ^ l >>> 32);
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof DoubleType) {
-			DoubleType other = (DoubleType) obj;
-			return Double.doubleToLongBits(this.value) == Double.doubleToLongBits(other.value);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/FloatType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/FloatType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/FloatType.java
deleted file mode 100644
index 7bc37a0..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/FloatType.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class FloatType implements SerializationTestType
-{
-	private float value;
-	
-
-	public FloatType()
-	{
-		this.value = 0;
-	}
-	
-	private FloatType(float value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public FloatType getRandom(Random rnd)
-	{
-		return new FloatType(rnd.nextFloat());
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeFloat(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readFloat();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return Float.floatToIntBits(this.value);
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof FloatType) {
-			FloatType other = (FloatType) obj;
-			return Float.floatToIntBits(this.value) == Float.floatToIntBits(other.value);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/IntType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/IntType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/IntType.java
deleted file mode 100644
index 1180884..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/IntType.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class IntType implements SerializationTestType
-{
-	private int value;
-	
-
-	public IntType()
-	{
-		this.value = 0;
-	}
-	
-	public IntType(int value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public IntType getRandom(Random rnd)
-	{
-		return new IntType(rnd.nextInt());
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeInt(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readInt();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return this.value;
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof IntType) {
-			IntType other = (IntType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/LongType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/LongType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/LongType.java
deleted file mode 100644
index 8c752f6..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/LongType.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class LongType implements SerializationTestType
-{
-	private long value;
-	
-
-	public LongType()
-	{
-		this.value = 0;
-	}
-	
-	private LongType(long value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public LongType getRandom(Random rnd)
-	{
-		return new LongType(rnd.nextLong());
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeLong(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readLong();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return (int) (this.value ^ this.value >>> 32);
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof LongType) {
-			LongType other = (LongType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/SerializationTestType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/SerializationTestType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/SerializationTestType.java
deleted file mode 100644
index 77630b0..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/SerializationTestType.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.util.Random;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- */
-public interface SerializationTestType extends IOReadableWritable
-{
-	public SerializationTestType getRandom(Random rnd);
-
-	// public static final String REST1_PATH = "src/test/resources/clustering/rest1.json";
-	// public static final String SAMPLE1_PATH = "src/test/resources/clustering/sample1.json";
-	//
-	// public static IJsonNode asJson(final String... values) {
-	// return new Point(String.valueOf(pointCount++), values).write(null);
-	// }
-	//
-	// public static List<IJsonNode> loadPoints(final String filePath)
-	// throws IOException {
-	// BufferedReader reader = null;
-	// try {
-	// final File pointFile = new File(filePath);
-	// reader = new BufferedReader(new FileReader(pointFile));
-	// final JsonParser parser = new JsonParser(reader);
-	// final List<IJsonNode> pointNodes = new LinkedList<IJsonNode>();
-	// while (!parser.checkEnd())
-	// pointNodes.add(parser.readValueAsTree());
-	// return pointNodes;
-	// } finally {
-	// try {
-	// reader.close();
-	// } catch (final Exception e) {
-	// e.printStackTrace();
-	// }
-	// }
-	// }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ShortType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ShortType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ShortType.java
deleted file mode 100644
index 09d0009..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/ShortType.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class ShortType implements SerializationTestType
-{
-	private short value;
-	
-
-	public ShortType()
-	{
-		this.value = (short) 0;
-	}
-	
-	private ShortType(short value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public ShortType getRandom(Random rnd)
-	{
-		return new ShortType((short) rnd.nextInt(65536));
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeShort(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readShort();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return this.value;
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof ShortType) {
-			ShortType other = (ShortType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UTFStringType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UTFStringType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UTFStringType.java
deleted file mode 100644
index c4481fa..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UTFStringType.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class UTFStringType implements SerializationTestType
-{
-	private static final int MAX_LEN = 1500;
-	
-	private String value;
-	
-
-	public UTFStringType()
-	{
-		this.value = "";
-	}
-	
-	private UTFStringType(String value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public UTFStringType getRandom(Random rnd)
-	{
-		final StringBuilder bld = new StringBuilder();
-		final int len = rnd.nextInt(MAX_LEN + 1);
-		
-		for (int i = 0; i < len; i++) {
-			bld.append((char) rnd.nextInt(Character.MAX_VALUE));
-		}
-		
-		return new UTFStringType(bld.toString());
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeUTF(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readUTF();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return this.value.hashCode();
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof UTFStringType) {
-			UTFStringType other = (UTFStringType) obj;
-			return this.value.equals(other.value);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UnsignedByteType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UnsignedByteType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UnsignedByteType.java
deleted file mode 100644
index 4edcd05..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UnsignedByteType.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class UnsignedByteType implements SerializationTestType
-{
-	private int value;
-	
-
-	public UnsignedByteType()
-	{
-		this.value = 0;
-	}
-	
-	private UnsignedByteType(int value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public UnsignedByteType getRandom(Random rnd)
-	{
-		return new UnsignedByteType(rnd.nextInt(128) + 128);
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeByte(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readUnsignedByte();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return this.value;
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof UnsignedByteType) {
-			UnsignedByteType other = (UnsignedByteType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UnsignedShortType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UnsignedShortType.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UnsignedShortType.java
deleted file mode 100644
index 898b343..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/channels/serialization/UnsignedShortType.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.serialization;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Random;
-
-/**
- */
-public class UnsignedShortType implements SerializationTestType
-{
-	private int value;
-	
-
-	public UnsignedShortType()
-	{
-		this.value = 0;
-	}
-	
-	private UnsignedShortType(int value)
-	{
-		this.value = value;
-	}
-	
-
-	@Override
-	public UnsignedShortType getRandom(Random rnd)
-	{
-		return new UnsignedShortType(rnd.nextInt(32768) + 32768);
-	}
-	
-
-	@Override
-	public void write(DataOutput out) throws IOException
-	{
-		out.writeShort(this.value);
-	}
-
-
-	@Override
-	public void read(DataInput in) throws IOException
-	{
-		this.value = in.readUnsignedShort();
-	}
-
-
-	@Override
-	public int hashCode()
-	{
-		return this.value;
-	}
-
-
-	@Override
-	public boolean equals(Object obj)
-	{
-		if (obj instanceof UnsignedShortType) {
-			UnsignedShortType other = (UnsignedShortType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/library/FileLineReadWriteTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/library/FileLineReadWriteTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/library/FileLineReadWriteTest.java
deleted file mode 100644
index f88d7bf..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/io/library/FileLineReadWriteTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.library;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.InputSplitProvider;
-
-/**
- * This class checks the functionality of the {@link FileLineReader} and the {@link FileLineWriter} class.
- * 
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FileLineReader.class)
-public class FileLineReadWriteTest {
-
-	@Mock
-	private Environment environment;
-
-	@Mock
-	private Configuration conf;
-
-	@Mock
-	private RecordReader<StringRecord> recordReader;
-
-	@Mock
-	private RecordWriter<StringRecord> recordWriter;
-
-	@Mock
-	private InputSplitProvider inputSplitProvider;
-
-	private File file = new File("./tmp");
-
-	/**
-	 * Set up mocks
-	 * 
-	 * @throws IOException
-	 */
-	@Before
-	public void before() throws Exception {
-
-		MockitoAnnotations.initMocks(this);
-	}
-
-	/**
-	 * remove the temporary file
-	 */
-	@After
-	public void after() {
-		this.file.delete();
-	}
-
-	/**
-	 * Tests the read and write methods
-	 * 
-	 * @throws Exception
-	 */
-	@Test
-	public void testReadWrite() throws Exception {
-
-		this.file.createNewFile();
-		FileLineWriter writer = new FileLineWriter();
-		Whitebox.setInternalState(writer, "environment", this.environment);
-		Whitebox.setInternalState(writer, "input", this.recordReader);
-		when(this.environment.getTaskConfiguration()).thenReturn(this.conf);
-
-		when(this.conf.getString("outputPath", null)).thenReturn(this.file.toURI().toString());
-		when(this.recordReader.hasNext()).thenReturn(true, true, true, false);
-		StringRecord in = new StringRecord("abc");
-		try {
-			when(this.recordReader.next()).thenReturn(in);
-		} catch (IOException e) {
-			fail();
-			e.printStackTrace();
-		} catch (InterruptedException e) {
-			fail();
-			e.printStackTrace();
-		}
-		writer.invoke();
-
-		final FileInputSplit split = new FileInputSplit(0, new Path(this.file.toURI().toString()), 0,
-			this.file.length(), null);
-		when(this.environment.getInputSplitProvider()).thenReturn(this.inputSplitProvider);
-		when(this.inputSplitProvider.getNextInputSplit()).thenReturn(split, (FileInputSplit) null);
-
-		FileLineReader reader = new FileLineReader();
-		Whitebox.setInternalState(reader, "environment", this.environment);
-		Whitebox.setInternalState(reader, "output", this.recordWriter);
-		StringRecord record = mock(StringRecord.class);
-
-		whenNew(StringRecord.class).withNoArguments().thenReturn(record);
-
-		reader.invoke();
-
-		// verify the correct bytes have been written and read
-		verify(record, times(3)).set(in.getBytes());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
index 6d29521..1e2be47 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
@@ -19,7 +19,7 @@ import eu.stratosphere.core.fs.FSDataInputStream;
 import eu.stratosphere.core.fs.FileInputSplit;
 import eu.stratosphere.core.fs.FileSystem;
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractFileInputTask;
 import eu.stratosphere.runtime.fs.LineReader;
 
@@ -31,6 +31,8 @@ public class DoubleSourceTask extends AbstractFileInputTask {
 
 	@Override
 	public void invoke() throws Exception {
+		this.output1.initializeSerializers();
+		this.output2.initializeSerializers();
 
 		final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
 
@@ -65,12 +67,15 @@ public class DoubleSourceTask extends AbstractFileInputTask {
 			// Close the stream;
 			lineReader.close();
 		}
+
+		this.output1.flush();
+		this.output2.flush();
 	}
 
 	@Override
 	public void registerInputOutput() {
-		this.output1 = new RecordWriter<StringRecord>(this, StringRecord.class);
-		this.output2 = new RecordWriter<StringRecord>(this, StringRecord.class);
+		this.output1 = new RecordWriter<StringRecord>(this);
+		this.output2 = new RecordWriter<StringRecord>(this);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
index cea9dd2..f0ca435 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
@@ -14,8 +14,8 @@
 package eu.stratosphere.nephele.jobmanager;
 
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractTask;
 
 public class DoubleTargetTask extends AbstractTask {
@@ -29,6 +29,8 @@ public class DoubleTargetTask extends AbstractTask {
 	@Override
 	public void invoke() throws Exception {
 
+		this.output.initializeSerializers();
+
 		while (this.input1.hasNext()) {
 
 			StringRecord s = input1.next();
@@ -41,13 +43,15 @@ public class DoubleTargetTask extends AbstractTask {
 			this.output.emit(s);
 		}
 
+		this.output.flush();
+
 	}
 
 	@Override
 	public void registerInputOutput() {
 		this.input1 = new RecordReader<StringRecord>(this, StringRecord.class);
 		this.input2 = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
+		this.output = new RecordWriter<StringRecord>(this);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionTask.java
index baae3b9..77b4f96 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ExceptionTask.java
@@ -14,8 +14,8 @@
 package eu.stratosphere.nephele.jobmanager;
 
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractTask;
 
 /**
@@ -57,7 +57,7 @@ public class ExceptionTask extends AbstractTask {
 	public void registerInputOutput() {
 
 		new RecordReader<StringRecord>(this, StringRecord.class);
-		new RecordWriter<StringRecord>(this, StringRecord.class);
+		new RecordWriter<StringRecord>(this);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
index be7be66..96be668 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
@@ -14,8 +14,8 @@
 package eu.stratosphere.nephele.jobmanager;
 
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractTask;
 
 public class ForwardTask extends AbstractTask {
@@ -26,16 +26,20 @@ public class ForwardTask extends AbstractTask {
 	@Override
 	public void invoke() throws Exception {
 
+		this.output.initializeSerializers();
+
 		while (this.input.hasNext()) {
 
 			StringRecord s = input.next();
 			this.output.emit(s);
 		}
+
+		this.output.flush();
 	}
 
 	@Override
 	public void registerInputOutput() {
 		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
+		this.output = new RecordWriter<StringRecord>(this);
 	}
 }


Mime
View raw message