flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
Date Fri, 01 Dec 2017 13:14:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274371#comment-16274371
] 

ASF GitHub Bot commented on FLINK-7416:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4533#discussion_r154338695
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
---
    @@ -244,54 +237,89 @@ public void testCancelBeforeActive() throws Exception {
     	}
     
     	/**
    -	 * Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and
    -	 * {@link AddCredit} message is sent to the producer.
    +	 * Verifies that {@link RemoteInputChannel} is enqueued in the pipeline for notifying
credits,
    +	 * and verifies the behaviour of credit notification by triggering channel's writability
changed.
     	 */
     	@Test
     	public void testNotifyCreditAvailable() throws Exception {
    +		final SingleInputGate inputGate = createSingleInputGate();
    +		final RemoteInputChannel inputChannel1 = spy(createRemoteInputChannel(inputGate));
    +		final RemoteInputChannel inputChannel2 = spy(createRemoteInputChannel(inputGate));
     		final CreditBasedClientHandler handler = new CreditBasedClientHandler();
    -		final EmbeddedChannel channel = new EmbeddedChannel(handler);
    +		final EmbeddedChannel channel = spy(new EmbeddedChannel(handler));
     
    -		final RemoteInputChannel inputChannel = createRemoteInputChannel(mock(SingleInputGate.class));
    +		// Increase the credits to enqueue the input channels
    +		inputChannel1.increaseCredit(1);
    +		inputChannel2.increaseCredit(1);
    +		handler.notifyCreditAvailable(inputChannel1);
    +		handler.notifyCreditAvailable(inputChannel2);
     
    -		// Enqueue the input channel
    -		handler.notifyCreditAvailable(inputChannel);
    +		channel.runPendingTasks();
    +
    +		// The two input channels should notify credits via writable channel
    +		assertTrue(channel.isWritable());
    +		assertEquals(channel.readOutbound().getClass(), AddCredit.class);
    +		verify(inputChannel1, times(1)).getAndResetCredit();
    +		verify(inputChannel2, times(1)).getAndResetCredit();
    +
    +		final int highWaterMark = channel.config().getWriteBufferHighWaterMark();
    +		// Set the writer index to the high water mark to ensure that all bytes are written
    +		// to the wire although the buffer is "empty".
    +		channel.write(Unpooled.buffer(highWaterMark).writerIndex(highWaterMark));
    +
    +		// Enqueue the input channel on the condition of un-writable channel
    +		inputChannel1.increaseCredit(1);
    +		handler.notifyCreditAvailable(inputChannel1);
     
     		channel.runPendingTasks();
     
    -		// Read the enqueued msg
    -		Object msg1 = channel.readOutbound();
    +		// The input channel will not notify credits via un-writable channel
    +		assertFalse(channel.isWritable());
    +		verify(inputChannel1, times(1)).getAndResetCredit();
    --- End diff --
    
    also add `assertNull(channel.readOutbound());`


> Implement Netty receiver outgoing pipeline for credit-based
> -----------------------------------------------------------
>
>                 Key: FLINK-7416
>                 URL: https://issues.apache.org/jira/browse/FLINK-7416
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental credit during
data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the channel
is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} and sends
its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and contain as much
credit as available for the channel at that point in time. Otherwise, it would only add latency
to the announcements and not increase throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message