flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [05/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:13 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java
new file mode 100644
index 0000000..66fa22c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ByteArrayType implements SerializationTestType {
+
+	private static final int MAX_LEN = 512 * 15;
+
+	private byte[] data;
+
+	public ByteArrayType() {
+		this.data = new byte[0];
+	}
+
+	public ByteArrayType(byte[] data) {
+		this.data = data;
+	}
+
+	@Override
+	public ByteArrayType getRandom(Random rnd) {
+		final int len = rnd.nextInt(MAX_LEN) + 1;
+		final byte[] data = new byte[len];
+		rnd.nextBytes(data);
+		return new ByteArrayType(data);
+	}
+
+	@Override
+	public int length() {
+		return data.length + 4;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(this.data.length);
+		out.write(this.data);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		final int len = in.readInt();
+		this.data = new byte[len];
+		in.readFully(this.data);
+	}
+
+	@Override
+	public int hashCode() {
+		return Arrays.hashCode(this.data);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ByteArrayType) {
+			ByteArrayType other = (ByteArrayType) obj;
+			return Arrays.equals(this.data, other.data);
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java
new file mode 100644
index 0000000..6431f14
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ByteSubArrayType implements SerializationTestType {
+
+	private static final int MAX_LEN = 512;
+
+	private final byte[] data;
+
+	private int len;
+
+	public ByteSubArrayType() {
+		this.data = new byte[MAX_LEN];
+		this.len = 0;
+	}
+
+	@Override
+	public ByteSubArrayType getRandom(Random rnd) {
+		final int len = rnd.nextInt(MAX_LEN) + 1;
+		final ByteSubArrayType t = new ByteSubArrayType();
+		t.len = len;
+
+		final byte[] data = t.data;
+		for (int i = 0; i < len; i++) {
+			data[i] = (byte) rnd.nextInt(256);
+		}
+
+		return t;
+	}
+
+	@Override
+	public int length() {
+		return len + 4;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(this.len);
+		out.write(this.data, 0, this.len);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.len = in.readInt();
+		in.readFully(this.data, 0, this.len);
+	}
+
+	@Override
+	public int hashCode() {
+		final byte[] copy = new byte[this.len];
+		System.arraycopy(this.data, 0, copy, 0, this.len);
+		return Arrays.hashCode(copy);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ByteSubArrayType) {
+			ByteSubArrayType other = (ByteSubArrayType) obj;
+			if (this.len == other.len) {
+				for (int i = 0; i < this.len; i++) {
+					if (this.data[i] != other.data[i]) {
+						return false;
+					}
+				}
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java
new file mode 100644
index 0000000..87fd7c0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ByteType implements SerializationTestType {
+
+	private byte value;
+
+	public ByteType() {
+		this.value = (byte) 0;
+	}
+
+	private ByteType(byte value) {
+		this.value = value;
+	}
+
+	@Override
+	public ByteType getRandom(Random rnd) {
+		return new ByteType((byte) rnd.nextInt(256));
+	}
+
+	@Override
+	public int length() {
+		return 1;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeByte(this.value);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.value = in.readByte();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ByteType) {
+			ByteType other = (ByteType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java
new file mode 100644
index 0000000..b162ea0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class CharType implements SerializationTestType {
+
+	private char value;
+
+	public CharType() {
+		this.value = 0;
+	}
+
+	private CharType(char value) {
+		this.value = value;
+	}
+
+	@Override
+	public CharType getRandom(Random rnd) {
+		return new CharType((char) rnd.nextInt(10000));
+	}
+
+	@Override
+	public int length() {
+		return 2;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeChar(this.value);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.value = in.readChar();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof CharType) {
+			CharType other = (CharType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java
new file mode 100644
index 0000000..654b685
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class DoubleType implements SerializationTestType {
+
+	private double value;
+
+	public DoubleType() {
+		this.value = 0;
+	}
+
+	private DoubleType(double value) {
+		this.value = value;
+	}
+
+	@Override
+	public DoubleType getRandom(Random rnd) {
+		return new DoubleType(rnd.nextDouble());
+	}
+
+	@Override
+	public int length() {
+		return 8;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeDouble(this.value);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.value = in.readDouble();
+	}
+
+	@Override
+	public int hashCode() {
+		final long l = Double.doubleToLongBits(this.value);
+		return (int) (l ^ l >>> 32);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof DoubleType) {
+			DoubleType other = (DoubleType) obj;
+			return Double.doubleToLongBits(this.value) == Double.doubleToLongBits(other.value);
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java
new file mode 100644
index 0000000..653be45
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class FloatType implements SerializationTestType {
+
+	private float value;
+
+	public FloatType() {
+		this.value = 0;
+	}
+
+	private FloatType(float value) {
+		this.value = value;
+	}
+
+	@Override
+	public FloatType getRandom(Random rnd) {
+		return new FloatType(rnd.nextFloat());
+	}
+
+	@Override
+	public int length() {
+		return 4;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeFloat(this.value);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.value = in.readFloat();
+	}
+
+	@Override
+	public int hashCode() {
+		return Float.floatToIntBits(this.value);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof FloatType) {
+			FloatType other = (FloatType) obj;
+			return Float.floatToIntBits(this.value) == Float.floatToIntBits(other.value);
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java
new file mode 100644
index 0000000..4c6429d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class IntType implements SerializationTestType {
+
+	private int value;
+
+	public IntType() {
+		this.value = 0;
+	}
+
+	public IntType(int value) {
+		this.value = value;
+	}
+
+	@Override
+	public IntType getRandom(Random rnd) {
+		return new IntType(rnd.nextInt());
+	}
+
+	@Override
+	public int length() {
+		return 4;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(this.value);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.value = in.readInt();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof IntType) {
+			IntType other = (IntType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java
new file mode 100644
index 0000000..934dfb7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class LongType implements SerializationTestType {
+
+	private long value;
+
+	public LongType() {
+		this.value = 0;
+	}
+
+	private LongType(long value) {
+		this.value = value;
+	}
+
+	@Override
+	public LongType getRandom(Random rnd) {
+		return new LongType(rnd.nextLong());
+	}
+
+	@Override
+	public int length() {
+		return 8;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeLong(this.value);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.value = in.readLong();
+	}
+
+	@Override
+	public int hashCode() {
+		return (int) (this.value ^ this.value >>> 32);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof LongType) {
+			LongType other = (LongType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java
new file mode 100644
index 0000000..69db122
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.util.Random;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+public interface SerializationTestType extends IOReadableWritable {
+
+	public SerializationTestType getRandom(Random rnd);
+
+	public int length();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
new file mode 100644
index 0000000..8ecbfce
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+public enum SerializationTestTypeFactory {
+	BOOLEAN(new BooleanType()),
+	BYTE_ARRAY(new ByteArrayType()),
+	BYTE_SUB_ARRAY(new ByteSubArrayType()),
+	BYTE(new ByteType()),
+	CHAR(new CharType()),
+	DOUBLE(new DoubleType()),
+	FLOAT(new FloatType()),
+	INT(new IntType()),
+	LONG(new LongType()),
+	SHORT(new ShortType()),
+	UNSIGNED_BYTE(new UnsignedByteType()),
+	UNSIGNED_SHORT(new UnsignedShortType()),
+	STRING(new AsciiStringType());
+
+	private final SerializationTestType factory;
+
+	SerializationTestTypeFactory(SerializationTestType type) {
+		this.factory = type;
+	}
+
+	public SerializationTestType factory() {
+		return this.factory;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java
new file mode 100644
index 0000000..69e0ffc
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ShortType implements SerializationTestType {
+
+	private short value;
+
+	public ShortType() {
+		this.value = (short) 0;
+	}
+
+	private ShortType(short value) {
+		this.value = value;
+	}
+
+	@Override
+	public ShortType getRandom(Random rnd) {
+		return new ShortType((short) rnd.nextInt(65536));
+	}
+
+	@Override
+	public int length() {
+		return 2;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeShort(this.value);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.value = in.readShort();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ShortType) {
+			ShortType other = (ShortType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java
new file mode 100644
index 0000000..eabb1dd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class UnsignedByteType implements SerializationTestType {
+
+	private int value;
+
+	public UnsignedByteType() {
+		this.value = 0;
+	}
+
+	private UnsignedByteType(int value) {
+		this.value = value;
+	}
+
+	@Override
+	public UnsignedByteType getRandom(Random rnd) {
+		return new UnsignedByteType(rnd.nextInt(128) + 128);
+	}
+
+	@Override
+	public int length() {
+		return 1;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeByte(this.value);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.value = in.readUnsignedByte();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof UnsignedByteType) {
+			UnsignedByteType other = (UnsignedByteType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java
new file mode 100644
index 0000000..6242900
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class UnsignedShortType implements SerializationTestType {
+
+	private int value;
+
+	public UnsignedShortType() {
+		this.value = 0;
+	}
+
+	private UnsignedShortType(int value) {
+		this.value = value;
+	}
+
+	@Override
+	public UnsignedShortType getRandom(Random rnd) {
+		return new UnsignedShortType(rnd.nextInt(32768) + 32768);
+	}
+
+	@Override
+	public int length() {
+		return 2;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeShort(this.value);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.value = in.readUnsignedShort();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof UnsignedShortType) {
+			UnsignedShortType other = (UnsignedShortType) obj;
+			return this.value == other.value;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java
new file mode 100644
index 0000000..037fbaf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization.types;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+public class Util {
+
+	private static final long SEED = 64871654635745873L;
+
+	private static Random random = new Random(SEED);
+
+	public static SerializationTestType randomRecord(SerializationTestTypeFactory type) {
+		return type.factory().getRandom(Util.random);
+	}
+
+	public static MockRecords randomRecords(final int numElements, final SerializationTestTypeFactory type) {
+
+		return new MockRecords(numElements) {
+			@Override
+			protected SerializationTestType getRecord() {
+				return type.factory().getRandom(Util.random);
+			}
+		};
+	}
+
+	public static MockRecords randomRecords(final int numElements) {
+
+		return new MockRecords(numElements) {
+			@Override
+			protected SerializationTestType getRecord() {
+				// select random test type factory
+				SerializationTestTypeFactory[] types = SerializationTestTypeFactory.values();
+				int i = Util.random.nextInt(types.length);
+
+				return types[i].factory().getRandom(Util.random);
+			}
+		};
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	public abstract static class MockRecords implements Iterable<SerializationTestType> {
+
+		private int numRecords;
+
+		public MockRecords(int numRecords) {
+			this.numRecords = numRecords;
+		}
+
+		@Override
+		public Iterator<SerializationTestType> iterator() {
+			return new Iterator<SerializationTestType>() {
+				@Override
+				public boolean hasNext() {
+					return numRecords > 0;
+				}
+
+				@Override
+				public SerializationTestType next() {
+					if (numRecords > 0) {
+						numRecords--;
+
+						return getRecord();
+					}
+
+					throw new NoSuchElementException();
+				}
+
+				@Override
+				public void remove() {
+					throw new UnsupportedOperationException();
+				}
+			};
+		}
+
+		abstract protected SerializationTestType getRecord();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
new file mode 100644
index 0000000..cf6eb9a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class BufferPoolFactoryTest {
+
+	private final static int numBuffers = 1024;
+
+	private final static int memorySegmentSize = 128;
+
+	private NetworkBufferPool networkBufferPool;
+
+	@Before
+	public void setupNetworkBufferPool() {
+		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
+	}
+
+	@After
+	public void verifyAllBuffersReturned() {
+		String msg = "Did not return all buffers to network buffer pool after test.";
+		assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments());
+	}
+
+	@Test(expected = IOException.class)
+	public void testRequireMoreThanPossible() throws IOException {
+		networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, false);
+	}
+
+	@Test
+	public void testFixedPool() throws IOException {
+		BufferPool lbp = networkBufferPool.createBufferPool(1, true);
+
+		assertEquals(1, lbp.getNumBuffers());
+	}
+
+	@Test
+	public void testSingleManagedPoolGetsAll() throws IOException {
+		BufferPool lbp = networkBufferPool.createBufferPool(1, false);
+
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), lbp.getNumBuffers());
+	}
+
+	@Test
+	public void testSingleManagedPoolGetsAllExceptFixedOnes() throws IOException {
+		BufferPool fixed = networkBufferPool.createBufferPool(24, true);
+
+		BufferPool lbp = networkBufferPool.createBufferPool(1, false);
+
+		assertEquals(24, fixed.getNumBuffers());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() - fixed.getNumBuffers(), lbp.getNumBuffers());
+	}
+
+	@Test
+	public void testUniformDistribution() throws IOException {
+		BufferPool first = networkBufferPool.createBufferPool(0, false);
+		BufferPool second = networkBufferPool.createBufferPool(0, false);
+
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, second.getNumBuffers());
+	}
+
+	@Test
+	public void testAllDistributed() {
+		Random random = new Random();
+
+		try {
+			List<BufferPool> pools = new ArrayList<BufferPool>();
+
+			int numPools = numBuffers / 32;
+			for (int i = 0; i < numPools; i++) {
+				pools.add(networkBufferPool.createBufferPool(random.nextInt(7 + 1), random.nextBoolean()));
+			}
+
+			int numDistributedBuffers = 0;
+			for (BufferPool pool : pools) {
+				numDistributedBuffers += pool.getNumBuffers();
+			}
+
+			assertEquals(numBuffers, numDistributedBuffers);
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail(t.getMessage());
+		}
+	}
+
+	@Test
+	public void testCreateDestroy() throws IOException {
+		BufferPool first = networkBufferPool.createBufferPool(0, false);
+
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
+
+		BufferPool second = networkBufferPool.createBufferPool(0, false);
+
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
+
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, second.getNumBuffers());
+
+		first.destroy();
+
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), second.getNumBuffers());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
new file mode 100644
index 0000000..17a079c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+public class BufferTest {
+
+	@Test
+	public void testSetGetSize() {
+		final MemorySegment segment = new MemorySegment(new byte[1024]);
+		final BufferRecycler recycler = Mockito.mock(BufferRecycler.class);
+
+		Buffer buffer = new Buffer(segment, recycler);
+		Assert.assertEquals(segment.size(), buffer.getSize());
+
+		buffer.setSize(segment.size() / 2);
+		Assert.assertEquals(segment.size() / 2, buffer.getSize());
+
+		try {
+			buffer.setSize(-1);
+			Assert.fail("Didn't throw expected exception");
+		} catch (IllegalArgumentException e) {
+			// OK => expected exception
+		}
+
+		try {
+			buffer.setSize(segment.size() + 1);
+			Assert.fail("Didn't throw expected exception");
+		} catch (IllegalArgumentException e) {
+			// OK => expected exception
+		}
+	}
+
+	@Test
+	public void testExceptionAfterRecycle() throws Throwable {
+		final MemorySegment segment = new MemorySegment(new byte[1024]);
+		final BufferRecycler recycler = Mockito.mock(BufferRecycler.class);
+
+		final Buffer buffer = new Buffer(segment, recycler);
+
+		buffer.recycle();
+
+		// Verify that the buffer has been recycled
+		Mockito.verify(recycler, Mockito.times(1)).recycle(Matchers.any(MemorySegment.class));
+
+		final Buffer spyBuffer = Mockito.spy(buffer);
+
+		// Check that every method throws the appropriate exception after the
+		// buffer has been recycled.
+		//
+		// Note: We cannot directly work on the spied upon buffer to get the
+		// declared methods as Mockito adds some of its own.
+		for (final Method method : buffer.getClass().getDeclaredMethods()) {
+			if (Modifier.isPublic(method.getModifiers()) && !method.getName().equals("toString")
+					&& !method.getName().equals("isRecycled") && !method.getName().equals("isBuffer")) {
+				// Get method of the spied buffer to allow argument matchers
+				final Method spyMethod = spyBuffer.getClass().getDeclaredMethod(method.getName(), method.getParameterTypes());
+
+				final Class<?>[] paramTypes = spyMethod.getParameterTypes();
+				final Object[] params = new Object[paramTypes.length];
+
+				for (int i = 0; i < params.length; i++) {
+					params[i] = Matchers.any(paramTypes[i]);
+				}
+
+				try {
+					spyMethod.invoke(spyBuffer, params);
+					Assert.fail("Didn't throw expected exception for method: " + method.getName());
+				} catch (InvocationTargetException e) {
+					if (e.getTargetException() instanceof IllegalStateException) {
+						// OK => expected exception
+					}
+					else {
+						throw e.getTargetException();
+					}
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
new file mode 100644
index 0000000..8609db0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.runtime.util.event.EventListener;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+public class LocalBufferPoolTest {
+
+	private final static int numBuffers = 1024;
+
+	private final static int memorySegmentSize = 128;
+
+	private NetworkBufferPool networkBufferPool;
+
+	private BufferPool localBufferPool;
+
+	@Before
+	public void setupLocalBufferPool() {
+		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
+		localBufferPool = new LocalBufferPool(networkBufferPool, 1);
+
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+	}
+
+	@After
+	public void destroyAndVerifyAllBuffersReturned() throws IOException {
+		if (!localBufferPool.isDestroyed()) {
+			localBufferPool.destroy();
+		}
+
+		String msg = "Did not return all buffers to memory segment pool after test.";
+		assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments());
+	}
+
+	@Test
+	public void testRequestMoreThanAvailable() throws IOException {
+		localBufferPool.setNumBuffers(numBuffers);
+
+		List<Buffer> requests = new ArrayList<Buffer>(numBuffers);
+
+		for (int i = 1; i <= numBuffers; i++) {
+			Buffer buffer = localBufferPool.requestBuffer();
+
+			assertEquals(i, getNumRequestedFromMemorySegmentPool());
+			assertNotNull(buffer);
+
+			requests.add(buffer);
+		}
+
+		{
+			// One more...
+			Buffer buffer = localBufferPool.requestBuffer();
+			assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+			assertNull(buffer);
+		}
+
+		for (Buffer buffer : requests) {
+			buffer.recycle();
+		}
+	}
+
+	@Test
+	public void testRequestAfterDestroy() throws IOException {
+		localBufferPool.destroy();
+
+		assertNull(localBufferPool.requestBuffer());
+	}
+
+	@Test
+	public void testRecycleAfterDestroy() throws IOException {
+		localBufferPool.setNumBuffers(numBuffers);
+
+		List<Buffer> requests = new ArrayList<Buffer>(numBuffers);
+
+		for (int i = 0; i < numBuffers; i++) {
+			requests.add(localBufferPool.requestBuffer());
+		}
+
+		localBufferPool.destroy();
+
+		// All buffers have been requested, but can not be returned yet.
+		assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+
+		// Recycle should return buffers to memory segment pool
+		for (Buffer buffer : requests) {
+			buffer.recycle();
+		}
+	}
+
+	@Test
+	public void testRecycleExcessBuffersAfterRecycling() throws Exception {
+		localBufferPool.setNumBuffers(numBuffers);
+
+		List<Buffer> requests = new ArrayList<Buffer>(numBuffers);
+
+		// Request all buffers
+		for (int i = 1; i <= numBuffers; i++) {
+			requests.add(localBufferPool.requestBuffer());
+		}
+
+		assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+
+		// Reduce the number of buffers in the local pool
+		localBufferPool.setNumBuffers(numBuffers / 2);
+
+		// Need to wait until we recycle the buffers
+		assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+
+		for (int i = 1; i < numBuffers / 2; i++) {
+			requests.remove(0).recycle();
+			assertEquals(numBuffers - i, getNumRequestedFromMemorySegmentPool());
+		}
+
+		for (Buffer buffer : requests) {
+			buffer.recycle();
+		}
+	}
+
+	@Test
+	public void testRecycleExcessBuffersAfterChangingNumBuffers() throws Exception {
+		localBufferPool.setNumBuffers(numBuffers);
+
+		List<Buffer> requests = new ArrayList<Buffer>(numBuffers);
+
+		// Request all buffers
+		for (int i = 1; i <= numBuffers; i++) {
+			requests.add(localBufferPool.requestBuffer());
+		}
+
+		// Recycle all
+		for (Buffer buffer : requests) {
+			buffer.recycle();
+		}
+
+		assertEquals(numBuffers, localBufferPool.getNumberOfAvailableMemorySegments());
+
+		localBufferPool.setNumBuffers(numBuffers / 2);
+
+		assertEquals(numBuffers / 2, localBufferPool.getNumberOfAvailableMemorySegments());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testSetLessThanRequiredNumBuffers() throws IOException {
+		localBufferPool.setNumBuffers(1);
+
+		localBufferPool.setNumBuffers(0);
+	}
+
+	// ------------------------------------------------------------------------
+	// Pending requests and integration with buffer futures
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testPendingRequestWithListenerAfterRecycle() throws Exception {
+		EventListener<Buffer> listener = spy(new EventListener<Buffer>() {
+			@Override
+			public void onEvent(Buffer buffer) {
+				buffer.recycle();
+			}
+		});
+
+		localBufferPool.setNumBuffers(1);
+
+		Buffer available = localBufferPool.requestBuffer();
+		Buffer unavailable = localBufferPool.requestBuffer();
+
+		assertNull(unavailable);
+
+		assertTrue(localBufferPool.addListener(listener));
+
+		available.recycle();
+
+		verify(listener, times(1)).onEvent(Matchers.any(Buffer.class));
+	}
+
+	@Test
+	public void testCancelPendingRequestsAfterDestroy() throws IOException {
+		EventListener<Buffer> listener = Mockito.mock(EventListener.class);
+
+		localBufferPool.setNumBuffers(1);
+
+		Buffer available = localBufferPool.requestBuffer();
+		Buffer unavailable = localBufferPool.requestBuffer();
+
+		assertNull(unavailable);
+
+		localBufferPool.addListener(listener);
+
+		localBufferPool.destroy();
+
+		available.recycle();
+
+		verify(listener, times(1)).onEvent(null);
+	}
+
+	// ------------------------------------------------------------------------
+	// Concurrent requests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testConcurrentRequestRecycle() throws ExecutionException, InterruptedException, IOException {
+		int numConcurrentTasks = 128;
+		int numBuffersToRequestPerTask = 1024;
+
+		localBufferPool.setNumBuffers(numConcurrentTasks);
+
+		final ExecutorService executor = Executors.newCachedThreadPool();
+
+		try {
+			Future<Boolean>[] taskResults = new Future[numConcurrentTasks];
+			for (int i = 0; i < numConcurrentTasks; i++) {
+				taskResults[i] = executor.submit(new BufferRequesterTask(localBufferPool, numBuffersToRequestPerTask));
+			}
+
+			for (int i = 0; i < numConcurrentTasks; i++) {
+				assertTrue(taskResults[i].get());
+			}
+		} finally {
+			executor.shutdownNow();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Helpers
+	// ------------------------------------------------------------------------
+
+	private int getNumRequestedFromMemorySegmentPool() {
+		return networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments();
+	}
+
+	private static class BufferRequesterTask implements Callable<Boolean> {
+
+		private final BufferProvider bufferProvider;
+
+		private final int numBuffersToRequest;
+
+		private BufferRequesterTask(BufferProvider bufferProvider, int numBuffersToRequest) {
+			this.bufferProvider = bufferProvider;
+			this.numBuffersToRequest = numBuffersToRequest;
+		}
+
+		@Override
+		public Boolean call() throws Exception {
+			try {
+				for (int i = 0; i < numBuffersToRequest; i++) {
+					Buffer buffer = bufferProvider.requestBufferBlocking();
+					buffer.recycle();
+				}
+			}
+			catch (Throwable t) {
+				return false;
+			}
+
+			return true;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
deleted file mode 100644
index 40eb2b3..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.bufferprovider;
-
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider.BufferAvailabilityRegistration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-public class LocalBufferPoolTest {
-
-	private final static int NUM_BUFFERS = 2048;
-
-	private final static int BUFFER_SIZE = 1024;
-
-	private final static GlobalBufferPool GLOBAL_BUFFER_POOL = new GlobalBufferPool(NUM_BUFFERS, BUFFER_SIZE);
-
-	private final static RecyclingBufferAvailableAnswer RECYCLING_BUFFER_AVAILABLE_ANSWER = new RecyclingBufferAvailableAnswer();
-
-	@BeforeClass
-	public static void setupGlobalBufferPoolOnce() {
-		Assert.assertEquals("GlobalBufferPool does not have required number of buffers.",
-				NUM_BUFFERS, GLOBAL_BUFFER_POOL.numBuffers());
-		Assert.assertEquals("GlobalBufferPool does not have required number of available buffers.",
-				NUM_BUFFERS, GLOBAL_BUFFER_POOL.numAvailableBuffers());
-	}
-
-	@After
-	public void verifyAllBuffersReturnedToGlobalBufferPool() {
-		Assert.assertEquals("Did not return all buffers to GlobalBufferPool after test.",
-				NUM_BUFFERS, GLOBAL_BUFFER_POOL.numAvailableBuffers());
-	}
-
-	@Test
-	public void testSingleConsumerNonBlockingRequestAndRecycle() throws IOException {
-		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
-
-		Assert.assertEquals(0, bufferPool.numRequestedBuffers());
-
-		// this request-recycle cycle should only take a single buffer out of
-		// the GlobalBufferPool as it is recycled over and over again
-		for (int numRequested = 0; numRequested < NUM_BUFFERS; numRequested++) {
-			Buffer buffer = bufferPool.requestBuffer(BUFFER_SIZE);
-
-			Assert.assertEquals(BUFFER_SIZE, buffer.size());
-
-			Assert.assertEquals("Expected single buffer request in buffer pool.",
-					1, bufferPool.numRequestedBuffers());
-			Assert.assertEquals("Expected no available buffer in buffer pool.",
-					0, bufferPool.numAvailableBuffers());
-
-			buffer.recycleBuffer();
-
-			Assert.assertEquals("Expected single available buffer after recycle.",
-					1, bufferPool.numAvailableBuffers());
-		}
-
-		bufferPool.destroy();
-	}
-
-	@Test
-	public void testSingleConsumerNonBlockingRequestMoreThanAvailable() throws IOException {
-		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
-
-		Assert.assertEquals(0, bufferPool.numRequestedBuffers());
-
-		// request all buffers from the buffer pool
-		Buffer[] requestedBuffers = new Buffer[NUM_BUFFERS];
-		for (int i = 0; i < NUM_BUFFERS; i++) {
-			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
-		}
-
-		Assert.assertEquals("Expected no available buffer in buffer pool.",
-				0, bufferPool.numAvailableBuffers());
-
-		Assert.assertNull("Expected null return value for buffer request with no available buffer.",
-				bufferPool.requestBuffer(BUFFER_SIZE));
-
-		// recycle all buffers and destroy buffer pool
-		for (Buffer buffer : requestedBuffers) {
-			buffer.recycleBuffer();
-		}
-
-		bufferPool.destroy();
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testSingleConsumerNonBlockingRequestTooLarge() throws IOException {
-		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
-
-		// request too large buffer for the pool
-		bufferPool.requestBuffer(BUFFER_SIZE * 2);
-	}
-
-	@Test
-	public void testSingleConsumerNonBlockingRequestSmall() throws IOException {
-		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
-
-		// request smaller buffer and verify size
-		Buffer buffer = bufferPool.requestBuffer(BUFFER_SIZE / 2);
-
-		Assert.assertEquals(BUFFER_SIZE / 2, buffer.size());
-
-		buffer.recycleBuffer();
-
-		bufferPool.destroy();
-	}
-
-	@Test
-	public void testSingleConsumerBlockingRequest() throws Exception {
-		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
-
-		final Buffer[] requestedBuffers = new Buffer[NUM_BUFFERS];
-		for (int i = 0; i < NUM_BUFFERS; i++) {
-			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
-		}
-
-		final Buffer[] bufferFromBlockingRequest = new Buffer[1];
-
-		// --------------------------------------------------------------------
-		// 1. blocking call: interrupt thread
-		// --------------------------------------------------------------------
-		Assert.assertEquals(NUM_BUFFERS, bufferPool.numRequestedBuffers());
-		Assert.assertEquals(0, bufferPool.numAvailableBuffers());
-
-		Thread blockingBufferRequestThread = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					bufferFromBlockingRequest[0] = bufferPool.requestBufferBlocking(BUFFER_SIZE);
-					Assert.fail("Unexpected return from blocking buffer request.");
-				} catch (IOException e) {
-					Assert.fail("Unexpected IOException during test.");
-				} catch (InterruptedException e) {
-					// expected interruption
-				}
-			}
-		});
-
-		// start blocking request thread, sleep, interrupt blocking request thread
-		blockingBufferRequestThread.start();
-
-		Thread.sleep(500);
-
-		blockingBufferRequestThread.interrupt();
-
-		Assert.assertNull(bufferFromBlockingRequest[0]);
-		Assert.assertEquals(NUM_BUFFERS, bufferPool.numRequestedBuffers());
-		Assert.assertEquals(0, bufferPool.numAvailableBuffers());
-
-		// --------------------------------------------------------------------
-		// 2. blocking call: recycle buffer in different thread
-		// --------------------------------------------------------------------
-		// recycle the buffer soon
-		new Timer().schedule(new TimerTask() {
-			@Override
-			public void run() {
-				requestedBuffers[0].recycleBuffer();
-			}
-		}, 500);
-
-		//
-		try {
-			Buffer buffer = bufferPool.requestBufferBlocking(BUFFER_SIZE);
-			Assert.assertNotNull(buffer);
-
-			buffer.recycleBuffer();
-		} catch (InterruptedException e) {
-			Assert.fail("Unexpected InterruptedException during test.");
-		}
-
-		// recycle remaining buffers
-		for (int i = 1; i < requestedBuffers.length; i++) {
-			requestedBuffers[i].recycleBuffer();
-		}
-
-		bufferPool.destroy();
-	}
-
-	@Test
-	public void testSingleConsumerRecycleAfterDestroy() throws IOException {
-		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
-
-		Buffer[] requestedBuffers = new Buffer[NUM_BUFFERS];
-		for (int i = 0; i < NUM_BUFFERS; i++) {
-			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
-		}
-
-		bufferPool.destroy();
-
-		// recycle should return buffers to GlobalBufferPool
-		// => verified in verifyAllBuffersReturned()
-		for (Buffer buffer : requestedBuffers) {
-			buffer.recycleBuffer();
-		}
-	}
-
-	@Test
-	public void testSingleConsumerBufferAvailabilityListenerRegistration() throws Exception {
-		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
-
-		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
-
-		// recycle buffer when listener mock is called back
-		doAnswer(RECYCLING_BUFFER_AVAILABLE_ANSWER).when(listener).bufferAvailable(Matchers.<Buffer>anyObject());
-
-		// request all buffers of the pool
-		Buffer[] requestedBuffers = new Buffer[NUM_BUFFERS];
-		for (int i = 0; i < NUM_BUFFERS; i++) {
-			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
-		}
-
-		BufferAvailabilityRegistration registration;
-		// --------------------------------------------------------------------
-		// 1. success
-		// --------------------------------------------------------------------
-		registration = bufferPool.registerBufferAvailabilityListener(listener);
-		Assert.assertEquals(BufferAvailabilityRegistration.SUCCEEDED_REGISTERED, registration);
-
-		// verify call to buffer listener after recycle
-		requestedBuffers[0].recycleBuffer();
-		verify(listener, times(1)).bufferAvailable(Matchers.<Buffer>anyObject());
-
-		Assert.assertEquals("Expected single available buffer after recycle call in mock listener.",
-				1, bufferPool.numAvailableBuffers());
-
-		// --------------------------------------------------------------------
-		// 2. failure: buffer is available
-		// --------------------------------------------------------------------
-		registration = bufferPool.registerBufferAvailabilityListener(listener);
-		Assert.assertEquals(BufferAvailabilityRegistration.FAILED_BUFFER_AVAILABLE, registration);
-
-		Buffer buffer = bufferPool.requestBuffer(BUFFER_SIZE);
-		Assert.assertNotNull(buffer);
-
-		buffer.recycleBuffer();
-
-		// --------------------------------------------------------------------
-		// 3. failure: buffer pool destroyed
-		// --------------------------------------------------------------------
-		bufferPool.destroy();
-
-		registration = bufferPool.registerBufferAvailabilityListener(listener);
-		Assert.assertEquals(BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED, registration);
-
-		// recycle remaining buffers
-		for (int i = 1; i < requestedBuffers.length; i++) {
-			requestedBuffers[i].recycleBuffer();
-		}
-	}
-
-	@Test
-	public void testSingleConsumerReturnExcessBuffers() throws Exception {
-		final LocalBufferPool bufferPool = new LocalBufferPool(GLOBAL_BUFFER_POOL, NUM_BUFFERS);
-
-		// request all buffers of the pool
-		Buffer[] requestedBuffers = new Buffer[NUM_BUFFERS];
-		for (int i = 0; i < NUM_BUFFERS; i++) {
-			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
-		}
-
-		Assert.assertEquals(NUM_BUFFERS, bufferPool.numRequestedBuffers());
-		Assert.assertEquals(0, bufferPool.numAvailableBuffers());
-
-		// recycle first half of the buffers
-		// => leave requested number of buffers unchanged
-		// => increase available number of buffers
-		for (int i = 0; i < NUM_BUFFERS / 2; i++) {
-			requestedBuffers[i].recycleBuffer();
-		}
-
-		Assert.assertEquals(NUM_BUFFERS, bufferPool.numRequestedBuffers());
-		Assert.assertEquals(NUM_BUFFERS / 2, bufferPool.numAvailableBuffers());
-
-		// reduce designated number of buffers
-		// => available buffers (1/2th) should be returned immediately
-		// => non-available buffers (1/4th) should be returned later
-		bufferPool.setNumDesignatedBuffers((NUM_BUFFERS / 2) - (NUM_BUFFERS / 4));
-
-		Assert.assertEquals(NUM_BUFFERS / 2, bufferPool.numRequestedBuffers());
-		Assert.assertEquals(0, bufferPool.numAvailableBuffers());
-
-		// recycle second half of the buffers
-		// => previously non-available buffers (1/4th) should be returned immediately
-		// => remaining buffers are the available ones (1/4th)
-		for (int i = NUM_BUFFERS / 2; i < NUM_BUFFERS; i++) {
-			requestedBuffers[i].recycleBuffer();
-		}
-
-		Assert.assertEquals("Expected current number of requested buffers to be equal to the number of designated buffers.",
-				bufferPool.numDesignatedBuffers(), bufferPool.numRequestedBuffers());
-
-		Assert.assertEquals("Expected current number of requested and available buffers to be equal, " +
-				"because all requested buffers have been recycled and become available again.",
-				bufferPool.numRequestedBuffers(), bufferPool.numAvailableBuffers());
-
-		// re-request remaining buffers and register buffer availability listener
-		int remaining = bufferPool.numRequestedBuffers();
-		for (int i = 0; i < remaining; i++) {
-			requestedBuffers[i] = bufferPool.requestBuffer(BUFFER_SIZE);
-		}
-
-		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
-		doAnswer(RECYCLING_BUFFER_AVAILABLE_ANSWER).when(listener).bufferAvailable(Matchers.<Buffer>anyObject());
-
-		BufferAvailabilityRegistration registration = bufferPool.registerBufferAvailabilityListener(listener);
-		Assert.assertEquals(BufferAvailabilityRegistration.SUCCEEDED_REGISTERED, registration);
-
-		// reduce number of designated buffers and recycle all buffers
-		bufferPool.setNumDesignatedBuffers(bufferPool.numDesignatedBuffers() - 1);
-
-		for (int i = 0; i < remaining; i++) {
-			requestedBuffers[i].recycleBuffer();
-		}
-
-		Assert.assertEquals(remaining - 1, bufferPool.numRequestedBuffers());
-		Assert.assertEquals(remaining - 1, bufferPool.numAvailableBuffers());
-
-		bufferPool.destroy();
-	}
-
-	// --------------------------------------------------------------------
-
-	private static class RecyclingBufferAvailableAnswer implements Answer<Void> {
-
-		@Override
-		public Void answer(InvocationOnMock invocation) throws Throwable {
-			Buffer buffer = (Buffer) invocation.getArguments()[0];
-			buffer.recycleBuffer();
-
-			return null;
-		}
-	}
-
-}


Mime
View raw message