flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [11/15] flink git commit: [FLINK-8208][network-tests] Reduce mockito usage in RecordWriterTest
Date Mon, 08 Jan 2018 13:04:05 GMT
[FLINK-8208][network-tests] Reduce mockito usage in RecordWriterTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97db0bf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97db0bf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97db0bf9

Branch: refs/heads/master
Commit: 97db0bf9c1448a7e672f5d0235e301d03e1cf7d2
Parents: 409ea23
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Fri Dec 1 11:14:06 2017 +0100
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Mon Jan 8 11:46:00 2018 +0100

----------------------------------------------------------------------
 .../io/network/api/writer/RecordWriterTest.java | 60 +++++++++++---------
 .../network/util/TestPooledBufferProvider.java  |  6 +-
 .../runtime/io/StreamRecordWriterTest.java      | 17 +-----
 3 files changed, 41 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 63540c3..59b98a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.XORShiftRandom;
@@ -172,12 +173,11 @@ public class RecordWriterTest {
 
 	@Test
 	public void testClearBuffersAfterExceptionInPartitionWriter() throws Exception {
-		NetworkBufferPool buffers = null;
+		NetworkBufferPool buffers = new NetworkBufferPool(1, 1024);
 		BufferPool bufferPool = null;
 
 		try {
-			buffers = new NetworkBufferPool(1, 1024);
-			bufferPool = spy(buffers.createBufferPool(1, Integer.MAX_VALUE));
+			bufferPool = buffers.createBufferPool(1, Integer.MAX_VALUE);
 
 			ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
 			when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));
@@ -190,12 +190,19 @@ public class RecordWriterTest {
 					Buffer buffer = (Buffer) invocation.getArguments()[0];
 					buffer.recycle();
 
-					throw new RuntimeException("Expected test Exception");
+					throw new ExpectedTestException();
 				}
 			}).when(partitionWriter).writeBuffer(any(Buffer.class), anyInt());
 
 			RecordWriter<IntValue> recordWriter = new RecordWriter<>(partitionWriter);
 
+			// Validate that memory segment was assigned to recordWriter
+			assertEquals(1, buffers.getNumberOfAvailableMemorySegments());
+			assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
+			recordWriter.emit(new IntValue(0));
+			assertEquals(0, buffers.getNumberOfAvailableMemorySegments());
+			assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
+
 			try {
 				// Verify that emit correctly clears the buffer. The infinite loop looks
 				// dangerous indeed, but the buffer will only be flushed after its full. Adding a
@@ -204,7 +211,7 @@ public class RecordWriterTest {
 					recordWriter.emit(new IntValue(0));
 				}
 			}
-			catch (Exception e) {
+			catch (ExpectedTestException e) {
 				// Verify that the buffer is not part of the record writer state after a failure
 				// to flush it out. If the buffer is still part of the record writer state, this
 				// will fail, because the buffer has already been recycled. NOTE: The mock
@@ -214,66 +221,72 @@ public class RecordWriterTest {
 
 			// Verify expected methods have been called
 			verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt());
-			verify(bufferPool, times(1)).requestBufferBlocking();
+			assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 
 			try {
 				// Verify that manual flushing correctly clears the buffer.
 				recordWriter.emit(new IntValue(0));
+				assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
 				recordWriter.flush();
 
 				Assert.fail("Did not throw expected test Exception");
 			}
-			catch (Exception e) {
+			catch (ExpectedTestException e) {
 				recordWriter.clearBuffers();
 			}
 
 			// Verify expected methods have been called
 			verify(partitionWriter, times(2)).writeBuffer(any(Buffer.class), anyInt());
-			verify(bufferPool, times(2)).requestBufferBlocking();
+			assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 
 			try {
 				// Verify that broadcast emit correctly clears the buffer.
+				recordWriter.broadcastEmit(new IntValue(0));
+				assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
+
 				for (;;) {
 					recordWriter.broadcastEmit(new IntValue(0));
 				}
 			}
-			catch (Exception e) {
+			catch (ExpectedTestException e) {
 				recordWriter.clearBuffers();
 			}
 
 			// Verify expected methods have been called
 			verify(partitionWriter, times(3)).writeBuffer(any(Buffer.class), anyInt());
-			verify(bufferPool, times(3)).requestBufferBlocking();
+			assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 
 			try {
 				// Verify that end of super step correctly clears the buffer.
 				recordWriter.emit(new IntValue(0));
+				assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
 				recordWriter.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
 
 				Assert.fail("Did not throw expected test Exception");
 			}
-			catch (Exception e) {
+			catch (ExpectedTestException e) {
 				recordWriter.clearBuffers();
 			}
 
 			// Verify expected methods have been called
 			verify(partitionWriter, times(4)).writeBuffer(any(Buffer.class), anyInt());
-			verify(bufferPool, times(4)).requestBufferBlocking();
+			assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 
 			try {
 				// Verify that broadcasting and event correctly clears the buffer.
 				recordWriter.emit(new IntValue(0));
+				assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
 				recordWriter.broadcastEvent(new TestTaskEvent());
 
 				Assert.fail("Did not throw expected test Exception");
 			}
-			catch (Exception e) {
+			catch (ExpectedTestException e) {
 				recordWriter.clearBuffers();
 			}
 
 			// Verify expected methods have been called
 			verify(partitionWriter, times(5)).writeBuffer(any(Buffer.class), anyInt());
-			verify(bufferPool, times(5)).requestBufferBlocking();
+			assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 		}
 		finally {
 			if (bufferPool != null) {
@@ -281,20 +294,15 @@ public class RecordWriterTest {
 				bufferPool.lazyDestroy();
 			}
 
-			if (buffers != null) {
-				assertEquals(1, buffers.getNumberOfAvailableMemorySegments());
-				buffers.destroy();
-			}
+			assertEquals(1, buffers.getNumberOfAvailableMemorySegments());
+			buffers.destroy();
 		}
 	}
 
 	@Test
 	public void testSerializerClearedAfterClearBuffers() throws Exception {
-
-		final Buffer buffer = TestBufferFactory.createBuffer(16);
-
 		ResultPartitionWriter partitionWriter = createResultPartitionWriter(
-				createBufferProvider(buffer));
+			new TestPooledBufferProvider(1, 16));
 
 		RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
 
@@ -324,7 +332,7 @@ public class RecordWriterTest {
 			queues[i] = new ArrayDeque<>();
 		}
 
-		BufferProvider bufferProvider = createBufferProvider(bufferSize);
+		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE,
bufferSize);
 
 		ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
 		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new
RoundRobin<ByteArrayIO>());
@@ -333,7 +341,7 @@ public class RecordWriterTest {
 		// No records emitted yet, broadcast should not request a buffer
 		writer.broadcastEvent(barrier);
 
-		verify(bufferProvider, times(0)).requestBufferBlocking();
+		assertEquals(0, bufferProvider.getNumberOfCreatedBuffers());
 
 		for (Queue<BufferOrEvent> queue : queues) {
 			assertEquals(1, queue.size());
@@ -360,7 +368,7 @@ public class RecordWriterTest {
 			queues[i] = new ArrayDeque<>();
 		}
 
-		BufferProvider bufferProvider = createBufferProvider(bufferSize);
+		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE,
bufferSize);
 
 		ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
 		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new
RoundRobin<ByteArrayIO>());
@@ -393,7 +401,7 @@ public class RecordWriterTest {
 		// (v) Broadcast the event
 		writer.broadcastEvent(barrier);
 
-		verify(bufferProvider, times(4)).requestBufferBlocking();
+		assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
 
 		assertEquals(2, queues[0].size()); // 1 buffer + 1 event
 		assertEquals(3, queues[1].size()); // 2 buffers + 1 event

http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index a88f4ba..cc52549 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -43,10 +43,14 @@ public class TestPooledBufferProvider implements BufferProvider {
 	private final PooledBufferProviderRecycler bufferRecycler;
 
 	public TestPooledBufferProvider(int poolSize) {
+		this(poolSize, 32 * 1024);
+	}
+
+	public TestPooledBufferProvider(int poolSize, int bufferSize) {
 		checkArgument(poolSize > 0);
 
 		this.bufferRecycler = new PooledBufferProviderRecycler(buffers);
-		this.bufferFactory = new TestBufferFactory(poolSize, 32 * 1024, bufferRecycler);
+		this.bufferFactory = new TestBufferFactory(poolSize, bufferSize, bufferRecycler);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index 78d4303..480cfd9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -19,18 +19,14 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.types.LongValue;
 
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 
@@ -86,16 +82,7 @@ public class StreamRecordWriterTest {
 	}
 
 	private static ResultPartitionWriter getMockWriter(int numPartitions) throws Exception {
-		BufferProvider mockProvider = mock(BufferProvider.class);
-		when(mockProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
-			@Override
-			public Buffer answer(InvocationOnMock invocation) {
-				return new Buffer(
-						MemorySegmentFactory.allocateUnpooledSegment(4096),
-						FreeingBufferRecycler.INSTANCE);
-			}
-		});
-
+		BufferProvider mockProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, 4096);
 		ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
 		when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
 		when(mockWriter.getNumberOfSubpartitions()).thenReturn(numPartitions);


Mime
View raw message