flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8581) Improve performance for low latency network
Date Tue, 13 Feb 2018 15:54:02 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362536#comment-16362536
] 

ASF GitHub Bot commented on FLINK-8581:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r167857194
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
---
    @@ -55,69 +51,56 @@
     	/** Flag indicating whether the subpartition has been released. */
     	private volatile boolean isReleased;
     
    -	/** The number of non-event buffers currently in this subpartition. */
    -	@GuardedBy("buffers")
    -	private int buffersInBacklog;
    -
     	// ------------------------------------------------------------------------
     
     	PipelinedSubpartition(int index, ResultPartition parent) {
     		super(index, parent);
     	}
     
     	@Override
    -	public boolean add(Buffer buffer) throws IOException {
    -		checkNotNull(buffer);
    -
    -		// view reference accessible outside the lock, but assigned inside the locked scope
    -		final PipelinedSubpartitionView reader;
    +	public boolean add(BufferConsumer bufferConsumer) throws IOException {
    +		return add(bufferConsumer, false);
    +	}
     
    +	@Override
    +	public void flush() {
     		synchronized (buffers) {
    -			if (isFinished || isReleased) {
    -				buffer.recycleBuffer();
    -				return false;
    +			if (readView != null) {
    +				readView.notifyDataAvailable();
     			}
    -
    -			// Add the buffer and update the stats
    -			buffers.add(buffer);
    -			reader = readView;
    -			updateStatistics(buffer);
    -			increaseBuffersInBacklog(buffer);
    -		}
    -
    -		// Notify the listener outside of the synchronized block
    -		if (reader != null) {
    -			reader.notifyBuffersAvailable(1);
     		}
    -
    -		return true;
     	}
     
     	@Override
     	public void finish() throws IOException {
    -		final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
    +		add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
    +		LOG.debug("Finished {}.", this);
    +	}
     
    -		// view reference accessible outside the lock, but assigned inside the locked scope
    -		final PipelinedSubpartitionView reader;
    +	private boolean add(BufferConsumer bufferConsumer, boolean finish) throws IOException
{
    --- End diff --
    
    I think you can remove the `throws IOException`, and after that also on the public `add(...)`.


> Improve performance for low latency network
> -------------------------------------------
>
>                 Key: FLINK-8581
>                 URL: https://issues.apache.org/jira/browse/FLINK-8581
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message