flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnowoj...@apache.org
Subject [flink] branch master updated: [FLINK-24035][network][refactor] Move the blocking allocation of one floating buffer logic from the constructor of LocalBufferPool to SingleInputGate#setupChannels()
Date Wed, 01 Sep 2021 16:08:42 GMT
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 75adb32  [FLINK-24035][network][refactor] Move the blocking allocation of one floating
buffer logic from the constructor of LocalBufferPool to SingleInputGate#setupChannels()
75adb32 is described below

commit 75adb3214a4c4142e52e3af9d6f57da2e09b2849
Author: kevin.cyj <kevin.cyj@alibaba-inc.com>
AuthorDate: Tue Aug 31 20:26:56 2021 +0800

    [FLINK-24035][network][refactor] Move the blocking allocation of one floating buffer logic
from the constructor of LocalBufferPool to SingleInputGate#setupChannels()
    
    This refactor makes the code cleaner and easier to understand. Besides, for the output
side, the blocking allocation of one floating buffer is not needed.
    
    This closes #17075.
---
 .../runtime/io/network/buffer/BufferPool.java      |  8 +++++
 .../runtime/io/network/buffer/LocalBufferPool.java | 30 ++++++++++++----
 .../io/network/buffer/NetworkBufferPool.java       |  5 +--
 .../partition/consumer/SingleInputGate.java        | 12 ++++++-
 .../io/network/buffer/LocalBufferPoolTest.java     | 42 ++++++++++++----------
 .../runtime/io/network/buffer/NoOpBufferPool.java  |  3 ++
 .../io/network/buffer/UnpooledBufferPool.java      |  3 ++
 ...editBasedPartitionRequestClientHandlerTest.java |  4 +--
 .../NettyMessageClientDecoderDelegateTest.java     |  2 +-
 .../NettyMessageClientSideSerializationTest.java   |  2 +-
 .../partition/consumer/SingleInputGateBuilder.java |  6 ++--
 11 files changed, 81 insertions(+), 36 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index 80d6e31..8474bf8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -18,10 +18,18 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
+import java.io.IOException;
+
 /** A dynamically sized buffer pool. */
 public interface BufferPool extends BufferProvider, BufferRecycler {
 
     /**
+     * Reserves the target number of segments to this pool. Will throw an exception if it
can not
+     * allocate enough segments.
+     */
+    void reserveSegments(int numberOfSegmentsToReserve) throws IOException;
+
+    /**
      * Destroys this buffer pool.
      *
      * <p>If not all buffers are available, they are recycled lazily as soon as they
are recycled.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 2e57e5c..600c30d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -219,13 +219,9 @@ class LocalBufferPool implements BufferPool {
         // Lock is only taken, because #checkAvailability asserts it. It's a small penalty
for
         // thread safety.
         synchronized (this.availableMemorySegments) {
-            // Make sure that this buffer pool always has one buffer on initialization. For
input
-            // side, it guarantees that the buffer listeners can get floating buffers properly
and
-            // no deadlock will occur (see FLINK-24035 for more information). For output
side, it
-            // means all result partitions will be available and ready for output on initialization.
-            availableMemorySegments.add(networkBufferPool.requestMemorySegmentBlocking());
-            ++numberOfRequestedMemorySegments;
-            availabilityHelper.resetAvailable();
+            if (checkAvailability()) {
+                availabilityHelper.resetAvailable();
+            }
 
             checkConsistentAvailability();
         }
@@ -236,6 +232,26 @@ class LocalBufferPool implements BufferPool {
     // ------------------------------------------------------------------------
 
     @Override
+    public void reserveSegments(int numberOfSegmentsToReserve) throws IOException {
+        checkArgument(
+                numberOfSegmentsToReserve <= numberOfRequiredMemorySegments,
+                "Can not reserve more segments than number of required segments.");
+
+        CompletableFuture<?> toNotify = null;
+        synchronized (availableMemorySegments) {
+            checkState(!isDestroyed, "Buffer pool has been destroyed.");
+
+            if (numberOfRequestedMemorySegments < numberOfSegmentsToReserve) {
+                availableMemorySegments.addAll(
+                        networkBufferPool.requestMemorySegmentsBlocking(
+                                numberOfSegmentsToReserve - numberOfRequestedMemorySegments));
+                toNotify = availabilityHelper.getUnavailableToResetAvailable();
+            }
+        }
+        mayNotifyAvailable(toNotify);
+    }
+
+    @Override
     public boolean isDestroyed() {
         synchronized (availableMemorySegments) {
             return isDestroyed;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index ac8b369..945e37f 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -156,8 +156,9 @@ public class NetworkBufferPool
         }
     }
 
-    public MemorySegment requestMemorySegmentBlocking() throws IOException {
-        return internalRequestMemorySegments(1).get(0);
+    public List<MemorySegment> requestMemorySegmentsBlocking(int numberOfSegmentsToRequest)
+            throws IOException {
+        return internalRequestMemorySegments(numberOfSegmentsToRequest);
     }
 
     public void recycle(MemorySegment segment) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 72afcca..8422e9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -257,10 +257,11 @@ public class SingleInputGate extends IndexedInputGate {
         checkState(
                 this.bufferPool == null,
                 "Bug in input gate setup logic: Already registered buffer pool.");
-        setupChannels();
 
         BufferPool bufferPool = bufferPoolFactory.get();
         setBufferPool(bufferPool);
+
+        setupChannels();
     }
 
     @Override
@@ -457,6 +458,15 @@ public class SingleInputGate extends IndexedInputGate {
     /** Assign the exclusive buffers to all remote input channels directly for credit-based
mode. */
     @VisibleForTesting
     public void setupChannels() throws IOException {
+        // Allocate enough exclusive and floating buffers to guarantee that job can make
progress.
+        // Note: An exception will be thrown if there is no buffer available in the given
timeout.
+
+        // First allocate a single floating buffer to avoid potential deadlock when the exclusive
+        // buffer is 0. See FLINK-24035 for more information.
+        bufferPool.reserveSegments(1);
+
+        // Next allocate the exclusive buffers per channel when the number of exclusive buffer
is
+        // larger than 0.
         synchronized (requestLock) {
             for (InputChannel inputChannel : inputChannels.values()) {
                 inputChannel.setup();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 1987286..37e685f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -54,6 +54,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.times;
@@ -101,32 +102,37 @@ public class LocalBufferPoolTest extends TestLogger {
     }
 
     @Test
-    public void testLocalBufferPoolInitialization() throws Exception {
+    public void testReserveSegments() throws Exception {
         NetworkBufferPool networkBufferPool =
                 new NetworkBufferPool(2, memorySegmentSize, Duration.ofSeconds(2));
+        try {
+            BufferPool bufferPool1 = networkBufferPool.createBufferPool(1, 2);
+            assertThrows(IllegalArgumentException.class, () -> bufferPool1.reserveSegments(2));
 
-        BufferPool localBufferPool = networkBufferPool.createBufferPool(1, 2);
-        assertTrue(localBufferPool.isAvailable());
-        assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+            // request all buffers
+            ArrayList<Buffer> buffers = new ArrayList<>(2);
+            buffers.add(bufferPool1.requestBuffer());
+            buffers.add(bufferPool1.requestBuffer());
+            assertEquals(2, buffers.size());
 
-        // request all buffers
-        ArrayList<Buffer> buffers = new ArrayList<>(2);
-        buffers.add(localBufferPool.requestBuffer());
-        buffers.add(localBufferPool.requestBuffer());
-        assertEquals(2, buffers.size());
+            BufferPool bufferPool2 = networkBufferPool.createBufferPool(1, 10);
+            assertThrows(IOException.class, () -> bufferPool2.reserveSegments(1));
+            assertFalse(bufferPool2.isAvailable());
 
-        try {
-            networkBufferPool.createBufferPool(1, 10);
-        } catch (IOException exception) {
-            // this is expected
-            return;
-        } finally {
             buffers.forEach(Buffer::recycleBuffer);
-            localBufferPool.lazyDestroy();
+            bufferPool1.lazyDestroy();
+            bufferPool2.lazyDestroy();
+
+            BufferPool bufferPool3 = networkBufferPool.createBufferPool(2, 10);
+            assertEquals(1, bufferPool3.getNumberOfAvailableMemorySegments());
+            bufferPool3.reserveSegments(2);
+            assertEquals(2, bufferPool3.getNumberOfAvailableMemorySegments());
+
+            bufferPool3.lazyDestroy();
+            assertThrows(IllegalStateException.class, () -> bufferPool3.reserveSegments(1));
+        } finally {
             networkBufferPool.destroy();
         }
-
-        fail("Should throw IOException");
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
index a7cc030..bdd6b4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
@@ -28,6 +28,9 @@ import java.util.concurrent.CompletableFuture;
 public class NoOpBufferPool implements BufferPool {
 
     @Override
+    public void reserveSegments(int numberOfSegmentsToReserve) {}
+
+    @Override
     public void lazyDestroy() {}
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
index 73b0804..eb4d019 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
@@ -29,6 +29,9 @@ public class UnpooledBufferPool implements BufferPool {
     private static final int SEGMENT_SIZE = 1024;
 
     @Override
+    public void reserveSegments(int numberOfSegmentsToReserve) {}
+
+    @Override
     public void lazyDestroy() {}
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 6e8cab9..c53717e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -602,7 +602,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
 
         try {
             inputGate.setInputChannels(inputChannel);
-            inputGate.setupChannels();
+            inputGate.setup();
             inputGate.requestPartitions();
             handler.addInputChannel(inputChannel);
 
@@ -732,7 +732,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
         SingleInputGate inputGate = createSingleInputGate(1, networkBufferPool);
         RemoteInputChannel inputChannel = new InputChannelBuilder().buildRemoteChannel(inputGate);
         inputGate.setInputChannels(inputChannel);
-        inputGate.setupChannels();
+        inputGate.setup();
 
         CreditBasedPartitionRequestClientHandler handler =
                 new CreditBasedPartitionRequestClientHandler();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
index 67f9301..4931a2f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
@@ -84,7 +84,7 @@ public class NettyMessageClientDecoderDelegateTest extends TestLogger {
                 createRemoteInputChannel(
                         inputGate, new TestingPartitionRequestClient(), NUMBER_OF_BUFFER_RESPONSES);
         inputGate.setInputChannels(inputChannel);
-        inputGate.setupChannels();
+        inputGate.setup();
         inputChannel.requestSubpartition(0);
         handler.addInputChannel(inputChannel);
         inputChannelId = inputChannel.getInputChannelId();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
index f3cc50f..cba3be2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
@@ -86,7 +86,7 @@ public class NettyMessageClientSideSerializationTest extends TestLogger
{
                 createRemoteInputChannel(inputGate, new TestingPartitionRequestClient());
         inputChannel.requestSubpartition(0);
         inputGate.setInputChannels(inputChannel);
-        inputGate.setupChannels();
+        inputGate.setup();
 
         CreditBasedPartitionRequestClientHandler handler =
                 new CreditBasedPartitionRequestClientHandler();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 1bf5f27..2fff0c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NoOpBufferPool;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -66,10 +67,7 @@ public class SingleInputGateBuilder {
     @Nullable
     private BiFunction<InputChannelBuilder, SingleInputGate, InputChannel> channelFactory
= null;
 
-    private SupplierWithException<BufferPool, IOException> bufferPoolFactory =
-            () -> {
-                throw new UnsupportedOperationException();
-            };
+    private SupplierWithException<BufferPool, IOException> bufferPoolFactory = NoOpBufferPool::new;
 
     public SingleInputGateBuilder setPartitionProducerStateProvider(
             PartitionProducerStateProvider partitionProducerStateProvider) {

Mime
View raw message