flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
Date Mon, 04 Dec 2017 05:10:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276291#comment-16276291
] 

ASF GitHub Bot commented on FLINK-7416:
---------------------------------------

Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4533#discussion_r154560542
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
---
    @@ -108,17 +117,75 @@ public void testReceiveEmptyBuffer() throws Exception {
     		final Buffer emptyBuffer = TestBufferFactory.createBuffer();
     		emptyBuffer.setSize(0);
     
    +		final int backlog = 2;
     		final BufferResponse receivedBuffer = createBufferResponse(
    -				emptyBuffer, 0, inputChannel.getInputChannelId());
    +			emptyBuffer, 0, inputChannel.getInputChannelId(), backlog);
     
    -		final PartitionRequestClientHandler client = new PartitionRequestClientHandler();
    +		final CreditBasedClientHandler client = new CreditBasedClientHandler();
     		client.addInputChannel(inputChannel);
     
     		// Read the empty buffer
     		client.channelRead(mock(ChannelHandlerContext.class), receivedBuffer);
     
     		// This should not throw an exception
     		verify(inputChannel, never()).onError(any(Throwable.class));
    +		verify(inputChannel, times(1)).onEmptyBuffer(0, backlog);
    +	}
    +
    +	/**
    +	 * Verifies that {@link RemoteInputChannel#onBuffer(Buffer, int, int)} is called when
a
    +	 * {@link BufferResponse} is received.
    +	 */
    +	@Test
    +	public void testReceiveBuffer() throws Exception {
    +		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
    +		final SingleInputGate inputGate = createSingleInputGate();
    +		final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate));
    +		inputGate.setInputChannel(inputChannel.getPartitionId().getPartitionId(), inputChannel);
    +		try {
    +			final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
    +			inputGate.setBufferPool(bufferPool);
    +			inputGate.assignExclusiveSegments(networkBufferPool, 2);
    +
    +			final CreditBasedClientHandler handler = new CreditBasedClientHandler();
    +			handler.addInputChannel(inputChannel);
    +
    +			final int backlog = 2;
    +			final BufferResponse bufferResponse = createBufferResponse(
    +				TestBufferFactory.createBuffer(32), 0, inputChannel.getInputChannelId(), backlog);
    +			handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse);
    +
    +			verify(inputChannel, times(1)).onBuffer(any(Buffer.class), anyInt(), anyInt());
    +			verify(inputChannel, times(1)).onSenderBacklog(backlog);
    --- End diff --
    
    That also makes sense, but it needs to make the `getNumberOfRequiredBuffers` as public
and `@VisibleForTesting` again.


> Implement Netty receiver outgoing pipeline for credit-based
> -----------------------------------------------------------
>
>                 Key: FLINK-7416
>                 URL: https://issues.apache.org/jira/browse/FLINK-7416
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental credit during
data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the channel
is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} and sends
its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and contain as much
credit as available for the channel at that point in time. Otherwise, it would only add latency
to the announcements and not increase throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message