flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8581) Improve performance for low latency network
Date Thu, 15 Feb 2018 14:15:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365570#comment-16365570
] 

ASF GitHub Bot commented on FLINK-8581:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r168486202
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
---
    @@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws IOException,
InterruptedExcep
     			&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
     			&& inputGate.isFinished()) {
     
    +			checkState(!bufferOrEvent.moreAvailable());
     			if (!inputGatesWithRemainingData.remove(inputGate)) {
     				throw new IllegalStateException("Couldn't find input gate in set of remaining " +
     					"input gates.");
     			}
     		}
     
    +		if (bufferOrEvent.moreAvailable()) {
    +			// this buffer or event was now removed from the non-empty gates queue
    +			// we re-add it in case it has more data, because in that case no "non-empty" notification
    +			// will come for that gate
    +			queueInputGate(inputGate);
    +		}
    +
     		// Set the channel index to identify the input channel (across all unioned input gates)
     		final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
     
     		bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex());
     
    -		return bufferOrEvent;
    +		return Optional.ofNullable(bufferOrEvent);
    +	}
    +
    +	@Override
    +	public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException
{
    +		throw new UnsupportedOperationException();
    +	}
    +
    +	private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException
{
    +		while (true) {
    +			InputGate inputGate;
    +			synchronized (inputGatesWithData) {
    +				while (inputGatesWithData.size() == 0) {
    +					inputGatesWithData.wait();
    +				}
    +				inputGate = inputGatesWithData.remove();
    +				enqueuedInputGatesWithData.remove(inputGate);
    +			}
    +
    +			// In case of inputGatesWithData being inaccurate do not block on an empty inputGate,
but just poll the data.
    --- End diff --
    
    It's kind of bad place for such comment - it can outdate without any control :/ What `UnionInputGate`
know about `OutputFlusher` from the sender. This code should just assume that there is no
guarantees about data notifications being accurate.
    
    It should be place in some high level network stack documentation.


> Improve performance for low latency network
> -------------------------------------------
>
>                 Key: FLINK-8581
>                 URL: https://issues.apache.org/jira/browse/FLINK-8581
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message