flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
Date Thu, 14 Sep 2017 13:48:03 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166324#comment-16166324
] 

ASF GitHub Bot commented on FLINK-7378:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138891582
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
---
    @@ -172,44 +178,117 @@ public void testDestroyAll() {
     		}
     	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently containing the number of required free segments.
    +	 */
     	@Test
    -	public void testRequestAndRecycleMemorySegments() throws Exception {
    +	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     		final int numBuffers = 10;
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> segments = null;
    -		// request buffers from global pool with illegal argument
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = globalPool.requestMemorySegments(0);
    -			fail("Should throw an IllegalArgumentException");
    -		} catch (IllegalArgumentException e) {
    -			assertNull(segments);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    +
    +			assertEquals(memorySegments.size(), numBuffers / 2);
    +		} finally {
    +			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    -		// common case to request buffers less than the total capacity of global pool
    -		final int numRequiredBuffers = 8;
    -		segments = globalPool.requestMemorySegments(numRequiredBuffers);
    -
    -		assertNotNull(segments);
    -		assertEquals(segments.size(), numRequiredBuffers);
    -
    -		// recycle all the requested buffers to global pool
    -		globalPool.recycleMemorySegments(segments);
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
    +	 * buffers exceeding the capacity of {@link NetworkBufferPool}.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
    +		final int numBuffers = 10;
     
    -		assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		// uncommon case to request buffers exceeding the total capacity of global pool
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = null;
    -			segments = globalPool.requestMemorySegments(11);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers + 1);
     			fail("Should throw an IOException");
     		} catch (IOException e) {
    -			assertNull(segments);
    -			// recycle all the requested buffers to global pool after exception
    +			assertEquals(memorySegments.size(), 0);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument
to
    +	 * cause exception.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsWithInvalidArgument() throws Exception {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		List<MemorySegment> memorySegments = Collections.emptyList();
    +		try {
    +			// the number of requested buffers should be larger than zero
    +			memorySegments = globalPool.requestMemorySegments(0);
    +			fail("Should throw an IllegalArgumentException");
    +		} catch (IllegalArgumentException e) {
    +			assertEquals(memorySegments.size(), 0);
    +			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		}
    +	}
    +
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently not containing the number of required free segments (currently occupied
by a buffer pool).
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException
{
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		final List<Buffer> buffers = new ArrayList<>(numBuffers);
    +		List<MemorySegment> memorySegments = Collections.emptyList();
    +		Thread bufferRecycler = null;
    +		BufferPool lbp1 = null;
    +		try {
    +			lbp1 = networkBufferPool.createBufferPool(numBuffers / 2, numBuffers);
    +
    +			// take all buffers (more than the minimum required)
    +			for (int i = 0; i < numBuffers; ++i) {
    +				Buffer buffer = lbp1.requestBuffer();
    +				buffers.add(buffer);
    +				assertNotNull(buffer);
    +			}
    +
    +			// if requestMemorySegments() blocks, this will make sure that enough buffers are
freed
    +			// eventually for it to continue
    +			bufferRecycler = new Thread(() -> {
    +				try {
    +					Thread.sleep(10000);
    --- End diff --
    
    Waiting 10s here will increase the probability of us reaching the desired (blocking) code
but also makes the test wait quite long. How about the following instead?
    
    ```
    			// requestMemorySegments() below will and wait for buffers
    			// this will make sure that enough buffers are freed eventually for it to continue
    			final OneShotLatch isRunning = new OneShotLatch();
    			bufferRecycler = new Thread(() -> {
    				try {
    					isRunning.trigger();
    					Thread.sleep(100);
    				} catch (InterruptedException ignored) {
    				}
    
    				for (Buffer buffer : buffers) {
    					buffer.recycle();
    				}
    			});
    			bufferRecycler.start();
    
    			// take more buffers than are freely available at the moment via requestMemorySegments()
    			isRunning.await();
    			memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2);
    ```
    
    That makes is more likely than my original variant which was only waiting 100ms but does
not increase the test time too much.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-7378
>                 URL: https://issues.apache.org/jira/browse/FLINK-7378
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Core
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for {{SingleInputGate}}
is limited by {{a * <number of channels> + b}}, where a is the number of exclusive buffers
for each channel and b is the number of floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix size buffer
pool used to manage the floating buffers for {{SingleInputGate}}. And the exclusive buffers
are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message