flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
Date Mon, 04 Dec 2017 15:12:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276918#comment-16276918
] 

ASF GitHub Bot commented on FLINK-7416:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4533#discussion_r154675125
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
    @@ -280,94 +280,120 @@ public String toString() {
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Enqueue this input channel in the pipeline for sending unannounced credits to producer.
    +	 * Enqueue this input channel in the pipeline for notifying the producer of unannounced
credit.
     	 */
     	void notifyCreditAvailable() {
    -		//TODO in next PR
    +		checkState(partitionRequestClient != null, "Tried to send task event to producer before
requesting a queue.");
    +
    +		// We should skip the notification if this channel is already released.
    +		if (!isReleased.get()) {
    +			partitionRequestClient.notifyCreditAvailable(this);
    +		}
     	}
     
     	/**
    -	 * Exclusive buffer is recycled to this input channel directly and it may trigger notify
    -	 * credit to producer.
    +	 * Exclusive buffer is recycled to this input channel directly and it may trigger return
extra
    +	 * floating buffer and notify increased credit to the producer.
     	 *
     	 * @param segment The exclusive segment of this channel.
     	 */
     	@Override
     	public void recycle(MemorySegment segment) {
    -		synchronized (availableBuffers) {
    -			// Important: the isReleased check should be inside the synchronized block.
    -			// that way the segment can also be returned to global pool after added into
    -			// the available queue during releasing all resources.
    +		int numAddedBuffers;
    +
    +		synchronized (bufferQueue) {
    +			// Important: check the isReleased state inside synchronized block, so there is no
    +			// race condition when recycle and releaseAllResources running in parallel.
     			if (isReleased.get()) {
     				try {
    -					inputGate.returnExclusiveSegments(Arrays.asList(segment));
    +					inputGate.returnExclusiveSegments(Collections.singletonList(segment));
     					return;
     				} catch (Throwable t) {
     					ExceptionUtils.rethrow(t);
     				}
     			}
    -			availableBuffers.add(new Buffer(segment, this));
    +			numAddedBuffers = bufferQueue.addExclusiveBuffer(new Buffer(segment, this), numRequiredBuffers);
     		}
     
    -		if (unannouncedCredit.getAndAdd(1) == 0) {
    +		if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(1) == 0) {
     			notifyCreditAvailable();
     		}
     	}
     
     	public int getNumberOfAvailableBuffers() {
    -		synchronized (availableBuffers) {
    -			return availableBuffers.size();
    +		synchronized (bufferQueue) {
    +			return bufferQueue.getAvailableBufferSize();
     		}
     	}
     
    +	@VisibleForTesting
    +	public int getNumberOfRequiredBuffers() {
    --- End diff --
    
    ok, let's keep this public just like `getNumberOfAvailableBuffers()` - I guess, you could
even remove the `@VisibleForTesting` sine it's just a getter


> Implement Netty receiver outgoing pipeline for credit-based
> -----------------------------------------------------------
>
>                 Key: FLINK-7416
>                 URL: https://issues.apache.org/jira/browse/FLINK-7416
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental credit during
data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the channel
is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} and sends
its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and contain as much
credit as available for the channel at that point in time. Otherwise, it would only add latency
to the announcements and not increase throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message