From commits-return-16126-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Feb 28 17:17:50 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9BC4718077B for ; Wed, 28 Feb 2018 17:17:48 +0100 (CET) Received: (qmail 62158 invoked by uid 500); 28 Feb 2018 16:17:47 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 61973 invoked by uid 99); 28 Feb 2018 16:17:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Feb 2018 16:17:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5B734F4E3B; Wed, 28 Feb 2018 16:17:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srichter@apache.org To: commits@flink.apache.org Date: Wed, 28 Feb 2018 16:17:51 -0000 Message-Id: <40224d7216734bbea2c0dbcc0c0530d8@git.apache.org> In-Reply-To: <4b407c586e0243aaa73278b1b98a9c58@git.apache.org> References: <4b407c586e0243aaa73278b1b98a9c58@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/7] flink git commit: [FLINK-8694][runtime] Fix notifyDataAvailable race condition [FLINK-8694][runtime] Fix notifyDataAvailable race condition Before there was a race condition that might resulted in igonoring some notifyDataAvailable calls. This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress test for flushAlways (without this fix this test is dead locking). (cherry picked from commit ebd39f3) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eb6a307 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eb6a307 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eb6a307 Branch: refs/heads/release-1.5 Commit: 8eb6a30798c09d171e3eb8019b53e677252bd5ba Parents: 8e62f90 Author: Piotr Nowojski Authored: Fri Feb 23 11:28:20 2018 +0100 Committer: Stefan Richter Committed: Wed Feb 28 17:17:02 2018 +0100 ---------------------------------------------------------------------- .../CreditBasedSequenceNumberingViewReader.java | 10 +--- .../netty/SequenceNumberingViewReader.java | 7 +-- .../partition/PipelinedSubpartition.java | 37 ++++++++++++-- .../partition/PipelinedSubpartitionView.java | 5 ++ .../partition/ResultSubpartitionView.java | 2 + .../partition/SpillableSubpartition.java | 1 - .../partition/SpillableSubpartitionView.java | 28 ++++++++--- .../partition/SpilledSubpartitionView.java | 8 +++ .../network/buffer/BufferBuilderTestUtils.java | 4 ++ .../netty/CancelPartitionRequestTest.java | 5 ++ .../netty/PartitionRequestQueueTest.java | 26 ++++++++-- .../partition/PipelinedSubpartitionTest.java | 53 ++++++++++++++++++++ .../partition/SpillableSubpartitionTest.java | 9 ++-- .../network/partition/SubpartitionTestBase.java | 5 ++ .../StreamNetworkThroughputBenchmarkTests.java | 8 +++ 15 files changed, 173 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java index d02b2bf..9acbbac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java @@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; /** * Simple wrapper for the subpartition view used in the new network credit-based mode. @@ -44,8 +43,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen private final InputChannelID receiverId; - private final AtomicBoolean buffersAvailable = new AtomicBoolean(); - private final PartitionRequestQueue requestQueue; private volatile ResultSubpartitionView subpartitionView; @@ -118,7 +115,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen @Override public boolean isAvailable() { // BEWARE: this must be in sync with #isAvailable()! - return buffersAvailable.get() && + return hasBuffersAvailable() && (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); } @@ -154,14 +151,13 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen @VisibleForTesting boolean hasBuffersAvailable() { - return buffersAvailable.get(); + return subpartitionView.isAvailable(); } @Override public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next != null) { - buffersAvailable.set(next.isMoreAvailable()); sequenceNumber++; if (next.buffer().isBuffer() && --numCreditsAvailable < 0) { @@ -197,7 +193,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen @Override public void notifyDataAvailable() { - buffersAvailable.set(true); requestQueue.notifyReaderNonEmpty(this); } @@ -206,7 +201,6 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen return "CreditBasedSequenceNumberingViewReader{" + "requestLock=" + requestLock + ", receiverId=" + receiverId + - ", buffersAvailable=" + buffersAvailable.get() + ", sequenceNumber=" + sequenceNumber + ", numCreditsAvailable=" + numCreditsAvailable + ", isRegisteredAsAvailable=" + isRegisteredAsAvailable + http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java index 2d9635c..6a83af1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; /** * Simple wrapper for the subpartition view used in the old network mode. @@ -43,8 +42,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network private final InputChannelID receiverId; - private final AtomicBoolean buffersAvailable = new AtomicBoolean(); - private final PartitionRequestQueue requestQueue; private volatile ResultSubpartitionView subpartitionView; @@ -96,7 +93,7 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network @Override public boolean isAvailable() { - return buffersAvailable.get(); + return subpartitionView.isAvailable(); } @Override @@ -113,7 +110,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next != null) { - buffersAvailable.set(next.isMoreAvailable()); sequenceNumber++; return new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()); } else { @@ -143,7 +139,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network @Override public void notifyDataAvailable() { - buffersAvailable.set(true); requestQueue.notifyReaderNonEmpty(this); } http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index a9c6e57..cc79363 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; @@ -48,6 +49,9 @@ class PipelinedSubpartition extends ResultSubpartition { /** Flag indicating whether the subpartition has been finished. */ private boolean isFinished; + @GuardedBy("buffers") + private boolean flushRequested; + /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; @@ -65,9 +69,11 @@ class PipelinedSubpartition extends ResultSubpartition { @Override public void flush() { synchronized (buffers) { - if (readView != null) { - readView.notifyDataAvailable(); + if (buffers.isEmpty()) { + return; } + flushRequested = !buffers.isEmpty(); + notifyDataAvailable(); } } @@ -93,7 +99,7 @@ class PipelinedSubpartition extends ResultSubpartition { if (finish) { isFinished = true; - notifyDataAvailable(); + flush(); } else { maybeNotifyDataAvailable(); @@ -138,17 +144,28 @@ class PipelinedSubpartition extends ResultSubpartition { synchronized (buffers) { Buffer buffer = null; + if (buffers.isEmpty()) { + flushRequested = false; + } + while (!buffers.isEmpty()) { BufferConsumer bufferConsumer = buffers.peek(); buffer = bufferConsumer.build(); + checkState(bufferConsumer.isFinished() || buffers.size() == 1, "When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue."); + if (buffers.size() == 1) { + // turn off flushRequested flag if we drained all of the available data + flushRequested = false; + } + if (bufferConsumer.isFinished()) { buffers.pop().close(); decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer()); } + if (buffer.readableBytes() > 0) { break; } @@ -169,7 +186,7 @@ class PipelinedSubpartition extends ResultSubpartition { // will be 2 or more. return new BufferAndBacklog( buffer, - getNumberOfFinishedBuffers() > 0, + isAvailableUnsafe(), getBuffersInBacklog(), _nextBufferIsEvent()); } @@ -211,13 +228,23 @@ class PipelinedSubpartition extends ResultSubpartition { readView = new PipelinedSubpartitionView(this, availabilityListener); if (!buffers.isEmpty()) { - readView.notifyDataAvailable(); + notifyDataAvailable(); } } return readView; } + public boolean isAvailable() { + synchronized (buffers) { + return isAvailableUnsafe(); + } + } + + private boolean isAvailableUnsafe() { + return flushRequested || getNumberOfFinishedBuffers() > 0; + } + // ------------------------------------------------------------------------ int getCurrentNumberOfBuffers() { http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java index c60a604..9d08358 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java @@ -81,6 +81,11 @@ class PipelinedSubpartitionView implements ResultSubpartitionView { } @Override + public boolean isAvailable() { + return parent.isAvailable(); + } + + @Override public Throwable getFailureCause() { return parent.getFailureCause(); } http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java index 41fbb0a..b1ccd63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java @@ -57,4 +57,6 @@ public interface ResultSubpartitionView { * Returns whether the next buffer is an event or not. */ boolean nextBufferIsEvent(); + + boolean isAvailable(); } http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 6ac493e..6b731d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -209,7 +209,6 @@ class SpillableSubpartition extends ResultSubpartition { parent.getBufferProvider().getMemorySegmentSize(), availabilityListener); } - return readView; } } http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index b821dcf..3c73e43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -153,18 +153,16 @@ class SpillableSubpartitionView implements ResultSubpartitionView { return null; } else if (nextBuffer != null) { current = nextBuffer.build(); + checkState(nextBuffer.isFinished(), + "We can only read from SpillableSubpartition after it was finished"); - if (nextBuffer.isFinished()) { - newBacklog = parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer()); - nextBuffer.close(); - nextBuffer = buffers.poll(); - } + newBacklog = parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer()); + nextBuffer.close(); + nextBuffer = buffers.poll(); - isMoreAvailable = buffers.size() > 0; if (nextBuffer != null) { - isMoreAvailable = true; - listener.notifyDataAvailable(); nextBufferIsEvent = !nextBuffer.isBuffer(); + isMoreAvailable = true; } parent.updateStatistics(current); @@ -246,6 +244,20 @@ class SpillableSubpartitionView implements ResultSubpartitionView { } @Override + public boolean isAvailable() { + synchronized (buffers) { + if (nextBuffer != null) { + return true; + } + else if (spilledView == null) { + return false; + } + } // else: spilled + + return spilledView.isAvailable(); + } + + @Override public Throwable getFailureCause() { SpilledSubpartitionView spilled = spilledView; if (spilled != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index 4c5cd2e..378b086 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -220,6 +220,14 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis } @Override + public synchronized boolean isAvailable() { + if (nextBuffer != null) { + return true; + } + return !fileReader.hasReachedEndOfFile(); + } + + @Override public Throwable getFailureCause() { return parent.getFailureCause(); } http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java index a6e9fdc..7beb18f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java @@ -38,6 +38,10 @@ public class BufferBuilderTestUtils { return createFilledBufferBuilder(size, 0); } + public static BufferBuilder createFilledBufferBuilder(int dataSize) { + return createFilledBufferBuilder(BUFFER_SIZE, dataSize); + } + public static BufferBuilder createFilledBufferBuilder(int size, int dataSize) { checkArgument(size >= dataSize); BufferBuilder bufferBuilder = new BufferBuilder( http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index 56abff1..eca8263 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -220,6 +220,11 @@ public class CancelPartitionRequestTest { } @Override + public boolean isAvailable() { + return true; + } + + @Override public Throwable getFailureCause() { return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java index 16418ff..f614c18 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java @@ -66,7 +66,7 @@ public class PartitionRequestQueueTest { CreditBasedSequenceNumberingViewReader reader1 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(0, 0), 10, queue); CreditBasedSequenceNumberingViewReader reader2 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(1, 1), 10, queue); - reader1.requestSubpartitionView((partitionId, index, availabilityListener) -> new NotReleasedResultSubpartitionView(), new ResultPartitionID(), 0); + reader1.requestSubpartitionView((partitionId, index, availabilityListener) -> new EmptyAlwaysAvailableResultSubpartitionView(), new ResultPartitionID(), 0); reader1.notifyDataAvailable(); assertTrue(reader1.isAvailable()); assertFalse(reader1.isRegisteredAsAvailable()); @@ -178,6 +178,11 @@ public class PartitionRequestQueueTest { buffers, false); } + + @Override + public boolean isAvailable() { + return buffersInBacklog.get() > 0; + } } private static class ReadOnlyBufferResultSubpartitionView extends DefaultBufferResultSubpartitionView { @@ -197,14 +202,19 @@ public class PartitionRequestQueueTest { } } - private static class NotReleasedResultSubpartitionView extends NoOpResultSubpartitionView { + private static class EmptyAlwaysAvailableResultSubpartitionView extends NoOpResultSubpartitionView { @Override public boolean isReleased() { return false; } + + @Override + public boolean isAvailable() { + return true; + } } - private static class ReleasedResultSubpartitionView extends NoOpResultSubpartitionView { + private static class ReleasedResultSubpartitionView extends EmptyAlwaysAvailableResultSubpartitionView { @Override public boolean isReleased() { return true; @@ -263,6 +273,11 @@ public class PartitionRequestQueueTest { public boolean nextBufferIsEvent() { return true; } + + @Override + public boolean isAvailable() { + return true; + } } /** @@ -387,5 +402,10 @@ public class PartitionRequestQueueTest { public boolean nextBufferIsEvent() { return false; } + + @Override + public boolean isAvailable() { + return false; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 528f0e2..ee678ab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.Future; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE; import static org.apache.flink.util.FutureUtil.waitForAll; @@ -142,6 +143,48 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { } } + /** + * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would + * busy loop on the unfinished BufferConsumers. + */ + @Test + public void testUnfinishedBufferBehindFinished() throws Exception { + final ResultSubpartition subpartition = createSubpartition(); + AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); + ResultSubpartitionView readView = subpartition.createReadView(availablityListener); + + try { + subpartition.add(createFilledBufferConsumer(1025)); // finished + subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + + assertNextBuffer(readView, 1025, false, 1); + } finally { + subpartition.release(); + } + } + + /** + * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some + * of the data. + */ + @Test + public void testFlushWithUnfinishedBufferBehindFinished() throws Exception { + final ResultSubpartition subpartition = createSubpartition(); + AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); + ResultSubpartitionView readView = subpartition.createReadView(availablityListener); + + try { + subpartition.add(createFilledBufferConsumer(1025)); // finished + subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + subpartition.flush(); + + assertNextBuffer(readView, 1025, true, 1); + assertNextBuffer(readView, 1024, false, 1); + } finally { + subpartition.release(); + } + } + @Test public void testMultipleEmptyBuffers() throws Exception { final ResultSubpartition subpartition = createSubpartition(); @@ -188,6 +231,16 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { } @Test + public void testEmptyFlush() throws Exception { + final PipelinedSubpartition subpartition = createSubpartition(); + + AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener(); + subpartition.createReadView(listener); + subpartition.flush(); + assertEquals(0, listener.getNumNotifications()); + } + + @Test public void testBasicPipelinedProduceConsumeLogic() throws Exception { final PipelinedSubpartition subpartition = createSubpartition(); http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index a6be748..e41a85c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -53,7 +53,6 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils. import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -319,7 +318,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(2, partition.getBuffersInBacklog()); assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); read.buffer().recycleBuffer(); - assertEquals(2, listener.getNumNotifications()); + assertTrue(read.isMoreAvailable()); + assertEquals(1, listener.getNumNotifications()); // since isMoreAvailable is set to true, no need for notification assertFalse(bufferConsumer.isRecycled()); assertFalse(read.nextBufferIsEvent()); @@ -332,8 +332,9 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { // only updated when getting/spilling the buffers but without the nextBuffer (kept in memory) assertEquals(BUFFER_DATA_SIZE * 3 + 4, partition.getTotalNumberOfBytes()); - listener.awaitNotifications(3, 30_000); - assertEquals(3, listener.getNumNotifications()); + listener.awaitNotifications(2, 30_000); + // Spiller finished + assertEquals(2, listener.getNumNotifications()); assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer) read = reader.getNextBuffer(); http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index 1b861df..215726b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -30,6 +30,7 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils. import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -143,4 +144,8 @@ public abstract class SubpartitionTestBase extends TestLogger { assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable()); assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); } + + protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException { + assertNull(readView.getNextBuffer()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8eb6a307/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java index a8251a8..a60fa3c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java @@ -53,6 +53,14 @@ public class StreamNetworkThroughputBenchmarkTests { } @Test + public void largeRemoteAlwaysFlush() throws Exception { + StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark(); + env.setUp(1, 1, 0, false); + env.executeBenchmark(1_000_000); + env.tearDown(); + } + + @Test public void pointToMultiPointBenchmark() throws Exception { StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); benchmark.setUp(1, 100, 100);