Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E43C9200D41 for ; Wed, 22 Nov 2017 10:47:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E2E07160BFD; Wed, 22 Nov 2017 09:47:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 353AE160BDA for ; Wed, 22 Nov 2017 10:47:31 +0100 (CET) Received: (qmail 84748 invoked by uid 500); 22 Nov 2017 09:47:30 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 84739 invoked by uid 99); 22 Nov 2017 09:47:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Nov 2017 09:47:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 9C8C0C3E85 for ; Wed, 22 Nov 2017 09:47:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.83 X-Spam-Level: X-Spam-Status: No, score=-3.83 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, KB_WAM_FROM_NAME_SINGLEWORD=0.2, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 27J5gmtXsfQD for ; Wed, 22 Nov 2017 09:47:28 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 8155A5FCDA for ; Wed, 22 Nov 2017 09:47:27 +0000 (UTC) Received: (qmail 84723 invoked by uid 99); 22 Nov 2017 09:47:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Nov 2017 09:47:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3734AE04AA; Wed, 22 Nov 2017 09:47:26 +0000 (UTC) From: zhijiangW To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc... Content-Type: text/plain Message-Id: <20171122094726.3734AE04AA@git1-us-west.apache.org> Date: Wed, 22 Nov 2017 09:47:26 +0000 (UTC) archived-at: Wed, 22 Nov 2017 09:47:32 -0000 Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152512614 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -301,81 +306,388 @@ public void testProducerFailedException() throws Exception { } /** - * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is - * recycled to available buffers directly and it triggers notify of announced credit. + * Tests to verify that the input channel requests floating buffers from buffer pool + * in order to maintain backlog + initialCredit buffers available once receiving the + * sender's backlog, and registers as listener if no floating buffers available. */ @Test - public void testRecycleExclusiveBufferBeforeReleased() throws Exception { - final SingleInputGate inputGate = mock(SingleInputGate.class); - final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + // Setup + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + try { + final int numFloatingBuffers = 10; + final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); + inputGate.setBufferPool(bufferPool); + + // Assign exclusive segments to the channel + final int numExclusiveBuffers = 2; + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); + inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); + + assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", + numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); - // Recycle exclusive segment - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + // Receive the producer's backlog + inputChannel.onSenderBacklog(8); - assertEquals("There should be one buffer available after recycle.", - 1, inputChannel.getNumberOfAvailableBuffers()); - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers + verify(bufferPool, times(8)).requestBuffer(); + verify(bufferPool, times(0)).addBufferListener(inputChannel); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); - inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + inputChannel.onSenderBacklog(11); - assertEquals("There should be two buffers available after recycle.", - 2, inputChannel.getNumberOfAvailableBuffers()); - // It should be called only once when increased from zero. - verify(inputChannel, times(1)).notifyCreditAvailable(); + // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 12 buffers available in the channel", + 12, inputChannel.getNumberOfAvailableBuffers()); + + inputChannel.onSenderBacklog(12); + + // Already in the status of waiting for buffers and will not request any more + verify(bufferPool, times(11)).requestBuffer(); + verify(bufferPool, times(1)).addBufferListener(inputChannel); + --- End diff -- Actually I tried to test the two logics in two separate tests `testRequestFloatingBufferOnSenderBacklog` and `testFairDistributionFloatingBuffers`. For `testRequestFloatingBufferOnSenderBacklog`, it only wants to verify the request logic on input channel side. The key point is that the input channel will not request repeated if it is already as listener in pool. For 'testFairDistributionFloatingBuffers`, it only wants to verify that the input channel listener is getting buffer fairly during buffer `recycle()` on `bufferPool` side. I think it can cover the comment you mentioned "Can you also verify the behaviour when the buffers become available?" I will further check that later. I agree with that we missed the tests to verify that we stick to `senderBacklog + initialCredit` in different processes. And I will add them later. ---