flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8581) Improve performance for low latency network
Date Thu, 15 Feb 2018 13:38:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365534#comment-16365534
] 

ASF GitHub Bot commented on FLINK-8581:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r168476375
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
---
    @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException
{
     	private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException
{
     		RecordSerializer<T> serializer = serializers[targetChannel];
     
    -		synchronized (serializer) {
    -			SerializationResult result = serializer.addRecord(record);
    -
    -			while (result.isFullBuffer()) {
    -				Buffer buffer = serializer.getCurrentBuffer();
    -
    -				if (buffer != null) {
    -					numBytesOut.inc(buffer.getSizeUnsafe());
    -					writeAndClearBuffer(buffer, targetChannel, serializer);
    -
    -					// If this was a full record, we are done. Not breaking
    -					// out of the loop at this point will lead to another
    -					// buffer request before breaking out (that would not be
    -					// a problem per se, but it can lead to stalls in the
    -					// pipeline).
    -					if (result.isFullRecord()) {
    -						break;
    -					}
    -				} else {
    -					BufferBuilder bufferBuilder =
    -						targetPartition.getBufferProvider().requestBufferBuilderBlocking();
    -					result = serializer.setNextBufferBuilder(bufferBuilder);
    +		SerializationResult result = serializer.addRecord(record);
    +
    +		while (result.isFullBuffer()) {
    +			if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
    +				// If this was a full record, we are done. Not breaking
    +				// out of the loop at this point will lead to another
    +				// buffer request before breaking out (that would not be
    +				// a problem per se, but it can lead to stalls in the
    +				// pipeline).
    +				if (result.isFullRecord()) {
    +					break;
     				}
     			}
    +			BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
    +
    +			result = serializer.setNextBufferBuilder(bufferBuilder);
     		}
    +		checkState(!serializer.hasSerializedData(), "All data should be written at once");
     	}
     
    -	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException
{
    -		final Buffer eventBuffer = EventSerializer.toBuffer(event);
    -		try {
    +	public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException
{
    --- End diff --
    
    Regarding returned value: without it I just didn't have a simple idea how to test for
reference counting/recycling :/


> Improve performance for low latency network
> -------------------------------------------
>
>                 Key: FLINK-8581
>                 URL: https://issues.apache.org/jira/browse/FLINK-8581
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message