[FLINK-8208][network-tests] Reduce mockito usage in RecordWriterTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97db0bf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97db0bf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97db0bf9
Branch: refs/heads/master
Commit: 97db0bf9c1448a7e672f5d0235e301d03e1cf7d2
Parents: 409ea23
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Fri Dec 1 11:14:06 2017 +0100
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Mon Jan 8 11:46:00 2018 +0100
----------------------------------------------------------------------
.../io/network/api/writer/RecordWriterTest.java | 60 +++++++++++---------
.../network/util/TestPooledBufferProvider.java | 6 +-
.../runtime/io/StreamRecordWriterTest.java | 17 +-----
3 files changed, 41 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 63540c3..59b98a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.testutils.DiscardingRecycler;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.XORShiftRandom;
@@ -172,12 +173,11 @@ public class RecordWriterTest {
@Test
public void testClearBuffersAfterExceptionInPartitionWriter() throws Exception {
- NetworkBufferPool buffers = null;
+ NetworkBufferPool buffers = new NetworkBufferPool(1, 1024);
BufferPool bufferPool = null;
try {
- buffers = new NetworkBufferPool(1, 1024);
- bufferPool = spy(buffers.createBufferPool(1, Integer.MAX_VALUE));
+ bufferPool = buffers.createBufferPool(1, Integer.MAX_VALUE);
ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));
@@ -190,12 +190,19 @@ public class RecordWriterTest {
Buffer buffer = (Buffer) invocation.getArguments()[0];
buffer.recycle();
- throw new RuntimeException("Expected test Exception");
+ throw new ExpectedTestException();
}
}).when(partitionWriter).writeBuffer(any(Buffer.class), anyInt());
RecordWriter<IntValue> recordWriter = new RecordWriter<>(partitionWriter);
+ // Validate that memory segment was assigned to recordWriter
+ assertEquals(1, buffers.getNumberOfAvailableMemorySegments());
+ assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
+ recordWriter.emit(new IntValue(0));
+ assertEquals(0, buffers.getNumberOfAvailableMemorySegments());
+ assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
+
try {
// Verify that emit correctly clears the buffer. The infinite loop looks
// dangerous indeed, but the buffer will only be flushed after its full. Adding a
@@ -204,7 +211,7 @@ public class RecordWriterTest {
recordWriter.emit(new IntValue(0));
}
}
- catch (Exception e) {
+ catch (ExpectedTestException e) {
// Verify that the buffer is not part of the record writer state after a failure
// to flush it out. If the buffer is still part of the record writer state, this
// will fail, because the buffer has already been recycled. NOTE: The mock
@@ -214,66 +221,72 @@ public class RecordWriterTest {
// Verify expected methods have been called
verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt());
- verify(bufferPool, times(1)).requestBufferBlocking();
+ assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
try {
// Verify that manual flushing correctly clears the buffer.
recordWriter.emit(new IntValue(0));
+ assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
recordWriter.flush();
Assert.fail("Did not throw expected test Exception");
}
- catch (Exception e) {
+ catch (ExpectedTestException e) {
recordWriter.clearBuffers();
}
// Verify expected methods have been called
verify(partitionWriter, times(2)).writeBuffer(any(Buffer.class), anyInt());
- verify(bufferPool, times(2)).requestBufferBlocking();
+ assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
try {
// Verify that broadcast emit correctly clears the buffer.
+ recordWriter.broadcastEmit(new IntValue(0));
+ assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
+
for (;;) {
recordWriter.broadcastEmit(new IntValue(0));
}
}
- catch (Exception e) {
+ catch (ExpectedTestException e) {
recordWriter.clearBuffers();
}
// Verify expected methods have been called
verify(partitionWriter, times(3)).writeBuffer(any(Buffer.class), anyInt());
- verify(bufferPool, times(3)).requestBufferBlocking();
+ assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
try {
// Verify that end of super step correctly clears the buffer.
recordWriter.emit(new IntValue(0));
+ assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
recordWriter.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
Assert.fail("Did not throw expected test Exception");
}
- catch (Exception e) {
+ catch (ExpectedTestException e) {
recordWriter.clearBuffers();
}
// Verify expected methods have been called
verify(partitionWriter, times(4)).writeBuffer(any(Buffer.class), anyInt());
- verify(bufferPool, times(4)).requestBufferBlocking();
+ assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
try {
// Verify that broadcasting and event correctly clears the buffer.
recordWriter.emit(new IntValue(0));
+ assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
recordWriter.broadcastEvent(new TestTaskEvent());
Assert.fail("Did not throw expected test Exception");
}
- catch (Exception e) {
+ catch (ExpectedTestException e) {
recordWriter.clearBuffers();
}
// Verify expected methods have been called
verify(partitionWriter, times(5)).writeBuffer(any(Buffer.class), anyInt());
- verify(bufferPool, times(5)).requestBufferBlocking();
+ assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
}
finally {
if (bufferPool != null) {
@@ -281,20 +294,15 @@ public class RecordWriterTest {
bufferPool.lazyDestroy();
}
- if (buffers != null) {
- assertEquals(1, buffers.getNumberOfAvailableMemorySegments());
- buffers.destroy();
- }
+ assertEquals(1, buffers.getNumberOfAvailableMemorySegments());
+ buffers.destroy();
}
}
@Test
public void testSerializerClearedAfterClearBuffers() throws Exception {
-
- final Buffer buffer = TestBufferFactory.createBuffer(16);
-
ResultPartitionWriter partitionWriter = createResultPartitionWriter(
- createBufferProvider(buffer));
+ new TestPooledBufferProvider(1, 16));
RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
@@ -324,7 +332,7 @@ public class RecordWriterTest {
queues[i] = new ArrayDeque<>();
}
- BufferProvider bufferProvider = createBufferProvider(bufferSize);
+ TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE,
bufferSize);
ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new
RoundRobin<ByteArrayIO>());
@@ -333,7 +341,7 @@ public class RecordWriterTest {
// No records emitted yet, broadcast should not request a buffer
writer.broadcastEvent(barrier);
- verify(bufferProvider, times(0)).requestBufferBlocking();
+ assertEquals(0, bufferProvider.getNumberOfCreatedBuffers());
for (Queue<BufferOrEvent> queue : queues) {
assertEquals(1, queue.size());
@@ -360,7 +368,7 @@ public class RecordWriterTest {
queues[i] = new ArrayDeque<>();
}
- BufferProvider bufferProvider = createBufferProvider(bufferSize);
+ TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE,
bufferSize);
ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new
RoundRobin<ByteArrayIO>());
@@ -393,7 +401,7 @@ public class RecordWriterTest {
// (v) Broadcast the event
writer.broadcastEvent(barrier);
- verify(bufferProvider, times(4)).requestBufferBlocking();
+ assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
assertEquals(2, queues[0].size()); // 1 buffer + 1 event
assertEquals(3, queues[1].size()); // 2 buffers + 1 event
http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index a88f4ba..cc52549 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -43,10 +43,14 @@ public class TestPooledBufferProvider implements BufferProvider {
private final PooledBufferProviderRecycler bufferRecycler;
public TestPooledBufferProvider(int poolSize) {
+ this(poolSize, 32 * 1024);
+ }
+
+ public TestPooledBufferProvider(int poolSize, int bufferSize) {
checkArgument(poolSize > 0);
this.bufferRecycler = new PooledBufferProviderRecycler(buffers);
- this.bufferFactory = new TestBufferFactory(poolSize, 32 * 1024, bufferRecycler);
+ this.bufferFactory = new TestBufferFactory(poolSize, bufferSize, bufferRecycler);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index 78d4303..480cfd9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -19,18 +19,14 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.types.LongValue;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import java.io.IOException;
@@ -86,16 +82,7 @@ public class StreamRecordWriterTest {
}
private static ResultPartitionWriter getMockWriter(int numPartitions) throws Exception {
- BufferProvider mockProvider = mock(BufferProvider.class);
- when(mockProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
- @Override
- public Buffer answer(InvocationOnMock invocation) {
- return new Buffer(
- MemorySegmentFactory.allocateUnpooledSegment(4096),
- FreeingBufferRecycler.INSTANCE);
- }
- });
-
+ BufferProvider mockProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, 4096);
ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
when(mockWriter.getNumberOfSubpartitions()).thenReturn(numPartitions);
|