Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 56518178F2 for ; Thu, 26 Mar 2015 10:52:01 +0000 (UTC) Received: (qmail 60361 invoked by uid 500); 26 Mar 2015 10:51:54 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 60284 invoked by uid 500); 26 Mar 2015 10:51:54 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 60263 invoked by uid 99); 26 Mar 2015 10:51:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Mar 2015 10:51:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A1B0AE2C0A; Thu, 26 Mar 2015 10:51:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.apache.org Date: Thu, 26 Mar 2015 10:51:55 -0000 Message-Id: <19998592ba224b7a87b4c495fb7febe2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] flink git commit: [runtime] Fix possible resource leak in RemoteInputChannel [runtime] Fix possible resource leak in RemoteInputChannel It was possible that a concurrent release and enqueue operation from the network I/O thread leave a buffer queued at a released channel, resulting in a resource leak. This commit fixes this and adds a test, which successfully provoked the race condition before the fix. The visibility of input channel operations is reduced to package private. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d72a3f7f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d72a3f7f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d72a3f7f Branch: refs/heads/master Commit: d72a3f7f99a7e242b191e993ced123d98a083d14 Parents: 95bdaad Author: Ufuk Celebi Authored: Tue Mar 24 10:25:52 2015 +0100 Committer: Ufuk Celebi Committed: Thu Mar 26 11:51:26 2015 +0100 ---------------------------------------------------------------------- .../network/netty/PartitionRequestClient.java | 2 +- .../partition/consumer/InputChannel.java | 14 +- .../partition/consumer/LocalInputChannel.java | 14 +- .../partition/consumer/RemoteInputChannel.java | 123 +++++++++------ .../io/network/api/reader/BufferReaderTest.java | 2 +- .../IteratorWrappingTestSingleInputGate.java | 103 ------------- .../IteratorWrappingTestSingleInputGate.java | 101 ++++++++++++ .../consumer/RemoteInputChannelTest.java | 152 +++++++++++++++++++ .../partition/consumer/SingleInputGateTest.java | 1 - .../partition/consumer/TestInputChannel.java | 134 ++++++++++++++++ .../partition/consumer/TestSingleInputGate.java | 139 +++++++++++++++++ .../partition/consumer/UnionInputGateTest.java | 1 - .../io/network/util/TestBufferFactory.java | 6 + .../io/network/util/TestInputChannel.java | 135 ---------------- .../io/network/util/TestSingleInputGate.java | 140 ----------------- .../runtime/operators/DataSinkTaskTest.java | 2 +- .../operators/testutils/MockEnvironment.java | 2 +- .../operators/testutils/TaskTestBase.java | 2 +- 18 files changed, 624 insertions(+), 449 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java index 32e3951..b5f89e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java @@ -79,7 +79,7 @@ public class PartitionRequestClient { * The request goes to the remote producer, for which this partition * request client instance has been created. */ - public void requestIntermediateResultPartition(final ResultPartitionID partitionId, int requestedQueueIndex, final RemoteInputChannel inputChannel) throws IOException { + public void requestSubpartition(final ResultPartitionID partitionId, int requestedQueueIndex, final RemoteInputChannel inputChannel) throws IOException { partitionRequestHandler.addInputChannel(inputChannel); tcpChannel.writeAndFlush(new PartitionRequest(partitionId, requestedQueueIndex, inputChannel.getInputChannelId())) http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 855509c..0805066 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -53,7 +53,7 @@ public abstract class InputChannel { // Properties // ------------------------------------------------------------------------ - public int getChannelIndex() { + int getChannelIndex() { return channelIndex; } @@ -75,12 +75,12 @@ public abstract class InputChannel { * The queue index to request depends on which sub task the channel belongs * to and is specified by the consumer of this channel. */ - public abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException; + abstract void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException; /** * Returns the next buffer from the consumed subpartition. */ - public abstract Buffer getNextBuffer() throws IOException, InterruptedException; + abstract Buffer getNextBuffer() throws IOException, InterruptedException; // ------------------------------------------------------------------------ // Task events @@ -94,19 +94,19 @@ public abstract class InputChannel { * the producer will wait for all backwards events. Otherwise, this will lead to an Exception * at runtime. */ - public abstract void sendTaskEvent(TaskEvent event) throws IOException; + abstract void sendTaskEvent(TaskEvent event) throws IOException; // ------------------------------------------------------------------------ // Life cycle // ------------------------------------------------------------------------ - public abstract boolean isReleased(); + abstract boolean isReleased(); - public abstract void notifySubpartitionConsumed() throws IOException; + abstract void notifySubpartitionConsumed() throws IOException; /** * Releases all resources of the channel. */ - public abstract void releaseAllResources() throws IOException; + abstract void releaseAllResources() throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 65f2627..8ea6407 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -53,7 +53,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe private volatile Buffer lookAhead; - public LocalInputChannel( + LocalInputChannel( SingleInputGate gate, int channelIndex, ResultPartitionID partitionId, @@ -71,7 +71,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe // ------------------------------------------------------------------------ @Override - public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { + void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { if (queueIterator == null) { LOG.debug("Requesting LOCAL queue {} of partition {}.", subpartitionIndex, partitionId); @@ -87,7 +87,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + Buffer getNextBuffer() throws IOException, InterruptedException { checkState(queueIterator != null, "Queried for a buffer before requesting a queue."); // After subscribe notification @@ -115,7 +115,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe // ------------------------------------------------------------------------ @Override - public void sendTaskEvent(TaskEvent event) throws IOException { + void sendTaskEvent(TaskEvent event) throws IOException { checkState(queueIterator != null, "Tried to send task event to producer before requesting a queue."); if (!taskEventDispatcher.publish(partitionId, event)) { @@ -128,12 +128,12 @@ public class LocalInputChannel extends InputChannel implements NotificationListe // ------------------------------------------------------------------------ @Override - public boolean isReleased() { + boolean isReleased() { return isReleased; } @Override - public void notifySubpartitionConsumed() throws IOException { + void notifySubpartitionConsumed() throws IOException { if (queueIterator != null) { queueIterator.notifySubpartitionConsumed(); } @@ -144,7 +144,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe * iterator. */ @Override - public void releaseAllResources() throws IOException { + void releaseAllResources() throws IOException { if (!isReleased) { if (lookAhead != null) { lookAhead.recycle(); http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index cae7837..df653a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -32,7 +32,6 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -44,37 +43,51 @@ public class RemoteInputChannel extends InputChannel { private static final Logger LOG = LoggerFactory.getLogger(RemoteInputChannel.class); - private final InputChannelID id; + /** ID to distinguish this channel from other channels sharing the same TCP connection. */ + private final InputChannelID id = new InputChannelID(); - private final ConnectionID producerAddress; + /** The connection to use to request the remote partition. */ + private final ConnectionID connectionId; - private final Queue receivedBuffers = new ArrayDeque(); + /** The connection manager to use connect to the remote partition provider. */ + private final ConnectionManager connectionManager; - private final AtomicReference ioError = new AtomicReference(); + /** + * The received buffers. Received buffers are enqueued by the network I/O thread and the queue + * is consumed by the receiving task thread. + */ + private final Queue receivedBuffers = new ArrayDeque(); + /** + * Flag indicating whether this channel has been released. Either called by the receiving task + * thread or the task manager actor. + */ private final AtomicBoolean isReleased = new AtomicBoolean(); + /** Client to establish a (possibly shared) TCP connection and request the partition. */ private PartitionRequestClient partitionRequestClient; + /** + * The next expected sequence number for the next buffer. This is modified by the network + * I/O thread only. + */ private int expectedSequenceNumber = 0; - private ConnectionManager connectionManager; + /** + * An error possibly set by the network I/O thread. + */ + private volatile Throwable error; - public RemoteInputChannel( - SingleInputGate gate, + RemoteInputChannel( + SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, - ConnectionID producerAddress, + ConnectionID connectionId, ConnectionManager connectionManager) { - super(gate, channelIndex, partitionId); + super(inputGate, channelIndex, partitionId); - /** - * This ID is used by the {@link PartitionRequestClient} to distinguish - * between receivers, which share the same TCP connection. - */ - this.id = new InputChannelID(); - this.producerAddress = checkNotNull(producerAddress); + this.connectionId = checkNotNull(connectionId); this.connectionManager = checkNotNull(connectionManager); } @@ -83,22 +96,25 @@ public class RemoteInputChannel extends InputChannel { // ------------------------------------------------------------------------ @Override - public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { + void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { if (partitionRequestClient == null) { - LOG.debug("Requesting REMOTE queue {} from of partition {}.", subpartitionIndex, partitionId); + LOG.debug("{}: Requesting REMOTE subpartition {} of partition {}.", + this, subpartitionIndex, partitionId); - partitionRequestClient = connectionManager.createPartitionRequestClient(producerAddress); + // Create a client and request the partition + partitionRequestClient = connectionManager + .createPartitionRequestClient(connectionId); - partitionRequestClient.requestIntermediateResultPartition(partitionId, subpartitionIndex, this); + partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this); } } @Override - public Buffer getNextBuffer() throws IOException { + Buffer getNextBuffer() throws IOException { checkState(!isReleased.get(), "Queried for a buffer after channel has been closed."); checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); - checkIoError(); + checkError(); synchronized (receivedBuffers) { Buffer buffer = receivedBuffers.poll(); @@ -117,11 +133,11 @@ public class RemoteInputChannel extends InputChannel { // ------------------------------------------------------------------------ @Override - public void sendTaskEvent(TaskEvent event) throws IOException { + void sendTaskEvent(TaskEvent event) throws IOException { checkState(!isReleased.get(), "Tried to send task event to producer after channel has been released."); checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue."); - checkIoError(); + checkError(); partitionRequestClient.sendTaskEvent(partitionId, event, this); } @@ -131,12 +147,12 @@ public class RemoteInputChannel extends InputChannel { // ------------------------------------------------------------------------ @Override - public boolean isReleased() { + boolean isReleased() { return isReleased.get(); } @Override - public void notifySubpartitionConsumed() { + void notifySubpartitionConsumed() { // Nothing to do } @@ -144,7 +160,7 @@ public class RemoteInputChannel extends InputChannel { * Releases all received buffers and closes the partition request client. */ @Override - public void releaseAllResources() throws IOException { + void releaseAllResources() throws IOException { if (isReleased.compareAndSet(false, true)) { synchronized (receivedBuffers) { Buffer buffer; @@ -155,21 +171,28 @@ public class RemoteInputChannel extends InputChannel { if (partitionRequestClient != null) { partitionRequestClient.close(this); - } else { - connectionManager.closeOpenChannelConnections(producerAddress); + } + else { + connectionManager.closeOpenChannelConnections(connectionId); } } } @Override public String toString() { - return "RemoteInputChannel [" + partitionId + " at " + producerAddress + "]"; + return "RemoteInputChannel [" + partitionId + " at " + connectionId + "]"; } // ------------------------------------------------------------------------ // Network I/O notifications (called by network I/O thread) // ------------------------------------------------------------------------ + public int getNumberOfQueuedBuffers() { + synchronized (receivedBuffers) { + return receivedBuffers.size(); + } + } + public InputChannelID getInputChannelId() { return id; } @@ -186,8 +209,8 @@ public class RemoteInputChannel extends InputChannel { boolean success = false; try { - if (!isReleased.get()) { - synchronized (receivedBuffers) { + synchronized (receivedBuffers) { + if (!isReleased.get()) { if (expectedSequenceNumber == sequenceNumber) { receivedBuffers.add(buffer); expectedSequenceNumber++; @@ -195,13 +218,11 @@ public class RemoteInputChannel extends InputChannel { notifyAvailableBuffer(); success = true; - - return; + } + else { + onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); } } - - IOException error = new BufferReorderingException(expectedSequenceNumber, sequenceNumber); - ioError.compareAndSet(null, error); } } finally { @@ -212,33 +233,35 @@ public class RemoteInputChannel extends InputChannel { } public void onEmptyBuffer(int sequenceNumber) { - if (!isReleased.get()) { - synchronized (receivedBuffers) { + synchronized (receivedBuffers) { + if (!isReleased.get()) { if (expectedSequenceNumber == sequenceNumber) { expectedSequenceNumber++; } else { - IOException error = new BufferReorderingException(expectedSequenceNumber, sequenceNumber); - ioError.compareAndSet(null, error); + onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); } } } } - public void onError(Throwable error) { - if (ioError.compareAndSet(null, error instanceof IOException ? (IOException) error : new IOException(error))) { + public void onError(Throwable cause) { + if (error == null) { + error = cause; + + // Notify the input gate to trigger querying of this channel notifyAvailableBuffer(); } } - // ------------------------------------------------------------------------ - - private void checkIoError() throws IOException { - IOException error = ioError.get(); + /** + * Checks whether this channel got notified by the network I/O thread about an error. + */ + private void checkError() throws IOException { + final Throwable t = error; - if (error != null) { - throw new IOException(String.format("%s at remote input channel: %s].", - error.getClass().getName(), error.getMessage())); + if (t != null) { + throw new IOException(t); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java index 7d3dbbe..e1f8fd8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api.reader; import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.util.TestSingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.util.event.EventListener; http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java deleted file mode 100644 index 3968eda..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/IteratorWrappingTestSingleInputGate.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api.reader; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; -import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; -import org.apache.flink.runtime.io.network.util.TestInputChannel; -import org.apache.flink.runtime.io.network.util.TestSingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.MutableObjectIterator; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.IOException; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class IteratorWrappingTestSingleInputGate extends TestSingleInputGate { - - private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0); - - private final int bufferSize; - - private MutableObjectIterator inputIterator; - - private RecordSerializer serializer; - - private final T reuse; - - public IteratorWrappingTestSingleInputGate(int bufferSize, Class recordType, MutableObjectIterator iterator) throws IOException, InterruptedException { - super(1, false); - - this.bufferSize = bufferSize; - this.reuse = InstantiationUtil.instantiate(recordType); - - wrapIterator(iterator); - } - - private IteratorWrappingTestSingleInputGate wrapIterator(MutableObjectIterator iterator) throws IOException, InterruptedException { - inputIterator = iterator; - serializer = new SpanningRecordSerializer(); - - // The input iterator can produce an infinite stream. That's why we have to serialize each - // record on demand and cannot do it upfront. - final Answer answer = new Answer() { - @Override - public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { - if (inputIterator.next(reuse) != null) { - final Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class)); - serializer.setNextBuffer(buffer); - serializer.addRecord(reuse); - - inputGate.onAvailableBuffer(inputChannel.getInputChannel()); - - // Call getCurrentBuffer to ensure size is set - return serializer.getCurrentBuffer(); - } - else { - - when(inputChannel.getInputChannel().isReleased()).thenReturn(true); - - return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); - } - } - }; - - when(inputChannel.getInputChannel().getNextBuffer()).thenAnswer(answer); - - inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel.getInputChannel()); - - return this; - } - - public IteratorWrappingTestSingleInputGate read() { - inputGate.onAvailableBuffer(inputChannel.getInputChannel()); - - return this; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java new file mode 100644 index 0000000..65780b4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; +import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.MutableObjectIterator; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class IteratorWrappingTestSingleInputGate extends TestSingleInputGate { + + private final TestInputChannel inputChannel = new TestInputChannel(inputGate, 0); + + private final int bufferSize; + + private MutableObjectIterator inputIterator; + + private RecordSerializer serializer; + + private final T reuse; + + public IteratorWrappingTestSingleInputGate(int bufferSize, Class recordType, MutableObjectIterator iterator) throws IOException, InterruptedException { + super(1, false); + + this.bufferSize = bufferSize; + this.reuse = InstantiationUtil.instantiate(recordType); + + wrapIterator(iterator); + } + + private IteratorWrappingTestSingleInputGate wrapIterator(MutableObjectIterator iterator) throws IOException, InterruptedException { + inputIterator = iterator; + serializer = new SpanningRecordSerializer(); + + // The input iterator can produce an infinite stream. That's why we have to serialize each + // record on demand and cannot do it upfront. + final Answer answer = new Answer() { + @Override + public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { + if (inputIterator.next(reuse) != null) { + final Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class)); + serializer.setNextBuffer(buffer); + serializer.addRecord(reuse); + + inputGate.onAvailableBuffer(inputChannel.getInputChannel()); + + // Call getCurrentBuffer to ensure size is set + return serializer.getCurrentBuffer(); + } + else { + + when(inputChannel.getInputChannel().isReleased()).thenReturn(true); + + return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + } + } + }; + + when(inputChannel.getInputChannel().getNextBuffer()).thenAnswer(answer); + + inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel.getInputChannel()); + + return this; + } + + public IteratorWrappingTestSingleInputGate read() { + inputGate.onAvailableBuffer(inputChannel.getInputChannel()); + + return this; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java new file mode 100644 index 0000000..7eb728c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import com.google.common.collect.Lists; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.util.TestBufferFactory; +import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; +import org.junit.Test; +import org.mockito.Matchers; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteInputChannelTest { + + @Test + public void testExceptionOnReordering() throws Exception { + // Setup + final SingleInputGate inputGate = mock(SingleInputGate.class); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + + // The test + inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 0); + + // This does not yet throw the exception, but sets the error at the channel. + inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 29); + + try { + inputChannel.getNextBuffer(); + + fail("Did not throw expected exception after enqueuing an out-of-order buffer."); + } + catch (Exception expected) { + } + + // Need to notify the input gate for the out-of-order buffer as well. Otherwise the + // receiving task will not notice the error. + verify(inputGate, times(2)).onAvailableBuffer(eq(inputChannel)); + } + + @Test(timeout = 120 * 1000) + public void testConcurrentOnBufferAndRelease() throws Exception { + // Config + // Repeatedly spawn two tasks: one to queue buffers and the other to release the channel + // concurrently. We do this repeatedly to provoke races. + final int numberOfRepetitions = 8192; + + // Setup + final ExecutorService executor = Executors.newFixedThreadPool(2); + + try { + // Test + final SingleInputGate inputGate = mock(SingleInputGate.class); + + for (int i = 0; i < numberOfRepetitions; i++) { + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + + final Callable enqueueTask = new Callable() { + @Override + public Void call() throws Exception { + while (true) { + for (int j = 0; j < 128; j++) { + inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), j); + } + + if (inputChannel.isReleased()) { + return null; + } + } + } + }; + + final Callable releaseTask = new Callable() { + @Override + public Void call() throws Exception { + inputChannel.releaseAllResources(); + + return null; + } + }; + + // Submit tasks and wait to finish + List> results = Lists.newArrayListWithCapacity(2); + + results.add(executor.submit(enqueueTask)); + results.add(executor.submit(releaseTask)); + + for (Future result : results) { + result.get(); + } + + assertEquals("Resource leak during concurrent release and enqueue.", + 0, inputChannel.getNumberOfQueuedBuffers()); + } + } + finally { + executor.shutdown(); + } + } + + // --------------------------------------------------------------------------------------------- + + private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) + throws IOException, InterruptedException { + + final ConnectionManager connectionManager = mock(ConnectionManager.class); + when(connectionManager.createPartitionRequestClient(any(ConnectionID.class))) + .thenReturn(mock(PartitionRequestClient.class)); + + return new RemoteInputChannel( + inputGate, + 0, + new ResultPartitionID(), + mock(ConnectionID.class), + connectionManager); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index e1e3cff..5871514 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; -import org.apache.flink.runtime.io.network.util.TestInputChannel; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java new file mode 100644 index 0000000..3dafffd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.util.TestTaskEvent; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * A mocked input channel. + */ +public class TestInputChannel { + + private final InputChannel mock = Mockito.mock(InputChannel.class); + + private final SingleInputGate inputGate; + + // Abusing Mockito here... ;) + protected OngoingStubbing stubbing; + + public TestInputChannel(SingleInputGate inputGate, int channelIndex) { + checkArgument(channelIndex >= 0); + this.inputGate = checkNotNull(inputGate); + + when(mock.getChannelIndex()).thenReturn(channelIndex); + } + + public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException { + if (stubbing == null) { + stubbing = when(mock.getNextBuffer()).thenReturn(buffer); + } + else { + stubbing = stubbing.thenReturn(buffer); + } + + inputGate.onAvailableBuffer(mock); + + return this; + } + + public TestInputChannel readBuffer() throws IOException, InterruptedException { + final Buffer buffer = mock(Buffer.class); + when(buffer.isBuffer()).thenReturn(true); + + return read(buffer); + } + + public TestInputChannel readEvent() throws IOException, InterruptedException { + return read(EventSerializer.toBuffer(new TestTaskEvent())); + } + + public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException { + return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE)); + } + + public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException { + final Answer answer = new Answer() { + @Override + public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { + // Return true after finishing + when(mock.isReleased()).thenReturn(true); + + return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + } + }; + + if (stubbing == null) { + stubbing = when(mock.getNextBuffer()).thenAnswer(answer); + } + else { + stubbing = stubbing.thenAnswer(answer); + } + + inputGate.onAvailableBuffer(mock); + + return this; + } + + public InputChannel getInputChannel() { + return mock; + } + + // ------------------------------------------------------------------------ + + /** + * Creates test input channels and attaches them to the specified input gate. + * + * @return The created test input channels. + */ + public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) { + checkNotNull(inputGate); + checkArgument(numberOfInputChannels > 0); + + TestInputChannel[] mocks = new TestInputChannel[numberOfInputChannels]; + + for (int i = 0; i < numberOfInputChannels; i++) { + mocks[i] = new TestInputChannel(inputGate, i); + + inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel()); + } + + return mocks; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java new file mode 100644 index 0000000..5033b3d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkElementIndex; +import static org.mockito.Mockito.spy; + +/** + * A test input gate to mock reading data. + */ +public class TestSingleInputGate { + + protected final SingleInputGate inputGate; + + protected final TestInputChannel[] inputChannels; + + public TestSingleInputGate(int numberOfInputChannels) { + this(numberOfInputChannels, true); + } + + public TestSingleInputGate(int numberOfInputChannels, boolean initialize) { + checkArgument(numberOfInputChannels >= 1); + + this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels)); + + this.inputChannels = new TestInputChannel[numberOfInputChannels]; + + if (initialize) { + for (int i = 0; i < numberOfInputChannels; i++) { + inputChannels[i] = new TestInputChannel(inputGate, i); + inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel()); + } + } + } + + public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException { + checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels()); + + inputChannels[channelIndex].read(buffer); + + return this; + } + + public TestSingleInputGate readBuffer() throws IOException, InterruptedException { + return readBuffer(0); + } + + public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException { + inputChannels[channelIndex].readBuffer(); + + return this; + } + + public TestSingleInputGate readEvent() throws IOException, InterruptedException { + return readEvent(0); + } + + public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException { + inputChannels[channelIndex].readEvent(); + + return this; + } + + public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException { + for (TestInputChannel inputChannel : inputChannels) { + inputChannel.readEndOfSuperstepEvent(); + } + + return this; + } + + public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException { + inputChannels[channelIndex].readEndOfSuperstepEvent(); + + return this; + } + + public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException { + for (TestInputChannel inputChannel : inputChannels) { + inputChannel.readEndOfPartitionEvent(); + } + + return this; + } + + public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException { + inputChannels[channelIndex].readEndOfPartitionEvent(); + + return this; + } + + public SingleInputGate getInputGate() { + return inputGate; + } + + // ------------------------------------------------------------------------ + + public List readAllChannels() throws IOException, InterruptedException { + final List readOrder = new ArrayList(inputChannels.length); + + for (int i = 0; i < inputChannels.length; i++) { + readOrder.add(i); + } + + Collections.shuffle(readOrder); + + for (int channelIndex : readOrder) { + inputChannels[channelIndex].readBuffer(); + } + + return readOrder; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index c7cb413..18e56ba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; -import org.apache.flink.runtime.io.network.util.TestInputChannel; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java index d10bf0c..0ff42b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java @@ -34,6 +34,8 @@ public class TestBufferFactory { private static final BufferRecycler discardingRecycler = new DiscardingRecycler(); + private static final Buffer mockBuffer = createBuffer(); + private final int bufferSize; private final BufferRecycler bufferRecycler; @@ -85,4 +87,8 @@ public class TestBufferFactory { return new Buffer(new MemorySegment(new byte[bufferSize]), discardingRecycler); } + + public static Buffer getMockBuffer() { + return mockBuffer; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java deleted file mode 100644 index 0e9e8e7..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.util; - -import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; -import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; -import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.mockito.stubbing.OngoingStubbing; - -import java.io.IOException; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * A mocked input channel. - */ -public class TestInputChannel { - - private final InputChannel mock = Mockito.mock(InputChannel.class); - - private final SingleInputGate inputGate; - - // Abusing Mockito here... ;) - protected OngoingStubbing stubbing; - - public TestInputChannel(SingleInputGate inputGate, int channelIndex) { - checkArgument(channelIndex >= 0); - this.inputGate = checkNotNull(inputGate); - - when(mock.getChannelIndex()).thenReturn(channelIndex); - } - - public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException { - if (stubbing == null) { - stubbing = when(mock.getNextBuffer()).thenReturn(buffer); - } - else { - stubbing = stubbing.thenReturn(buffer); - } - - inputGate.onAvailableBuffer(mock); - - return this; - } - - public TestInputChannel readBuffer() throws IOException, InterruptedException { - final Buffer buffer = mock(Buffer.class); - when(buffer.isBuffer()).thenReturn(true); - - return read(buffer); - } - - public TestInputChannel readEvent() throws IOException, InterruptedException { - return read(EventSerializer.toBuffer(new TestTaskEvent())); - } - - public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException { - return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE)); - } - - public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException { - final Answer answer = new Answer() { - @Override - public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { - // Return true after finishing - when(mock.isReleased()).thenReturn(true); - - return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); - } - }; - - if (stubbing == null) { - stubbing = when(mock.getNextBuffer()).thenAnswer(answer); - } - else { - stubbing = stubbing.thenAnswer(answer); - } - - inputGate.onAvailableBuffer(mock); - - return this; - } - - public InputChannel getInputChannel() { - return mock; - } - - // ------------------------------------------------------------------------ - - /** - * Creates test input channels and attaches them to the specified input gate. - * - * @return The created test input channels. - */ - public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) { - checkNotNull(inputGate); - checkArgument(numberOfInputChannels > 0); - - TestInputChannel[] mocks = new TestInputChannel[numberOfInputChannels]; - - for (int i = 0; i < numberOfInputChannels; i++) { - mocks[i] = new TestInputChannel(inputGate, i); - - inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel()); - } - - return mocks; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java deleted file mode 100644 index d10e1a0..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSingleInputGate.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.util; - -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkElementIndex; -import static org.mockito.Mockito.spy; - -/** - * A test input gate to mock reading data. - */ -public class TestSingleInputGate { - - protected final SingleInputGate inputGate; - - protected final TestInputChannel[] inputChannels; - - public TestSingleInputGate(int numberOfInputChannels) { - this(numberOfInputChannels, true); - } - - public TestSingleInputGate(int numberOfInputChannels, boolean initialize) { - checkArgument(numberOfInputChannels >= 1); - - this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels)); - - this.inputChannels = new TestInputChannel[numberOfInputChannels]; - - if (initialize) { - for (int i = 0; i < numberOfInputChannels; i++) { - inputChannels[i] = new TestInputChannel(inputGate, i); - inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel()); - } - } - } - - public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException { - checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels()); - - inputChannels[channelIndex].read(buffer); - - return this; - } - - public TestSingleInputGate readBuffer() throws IOException, InterruptedException { - return readBuffer(0); - } - - public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException { - inputChannels[channelIndex].readBuffer(); - - return this; - } - - public TestSingleInputGate readEvent() throws IOException, InterruptedException { - return readEvent(0); - } - - public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException { - inputChannels[channelIndex].readEvent(); - - return this; - } - - public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException { - for (TestInputChannel inputChannel : inputChannels) { - inputChannel.readEndOfSuperstepEvent(); - } - - return this; - } - - public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException { - inputChannels[channelIndex].readEndOfSuperstepEvent(); - - return this; - } - - public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException { - for (TestInputChannel inputChannel : inputChannels) { - inputChannel.readEndOfPartitionEvent(); - } - - return this; - } - - public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException { - inputChannels[channelIndex].readEndOfPartitionEvent(); - - return this; - } - - public SingleInputGate getInputGate() { - return inputGate; - } - - // ------------------------------------------------------------------------ - - public List readAllChannels() throws IOException, InterruptedException { - final List readOrder = new ArrayList(inputChannels.length); - - for (int i = 0; i < inputChannels.length; i++) { - readOrder.add(i); - } - - Collections.shuffle(readOrder); - - for (int channelIndex : readOrder) { - inputChannels[channelIndex].readBuffer(); - } - - return readOrder; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java index 63ce5e2..e91d338 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory; import org.apache.flink.api.java.record.io.DelimitedOutputFormat; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator; import org.apache.flink.runtime.operators.testutils.TaskCancelThread; http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 6625bbc..d5c9fbb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate; import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; http://git-wip-us.apache.org/repos/asf/flink/blob/d72a3f7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java index b93d37e..6ffc97b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java @@ -27,7 +27,7 @@ import org.apache.flink.api.java.record.io.FileOutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingTestSingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.operators.PactDriver;