Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EDB0C174EB for ; Wed, 18 Mar 2015 16:48:50 +0000 (UTC) Received: (qmail 5605 invoked by uid 500); 18 Mar 2015 16:48:50 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 5529 invoked by uid 500); 18 Mar 2015 16:48:50 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 5310 invoked by uid 99); 18 Mar 2015 16:48:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Mar 2015 16:48:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6B6D6E18D4; Wed, 18 Mar 2015 16:48:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.apache.org Date: Wed, 18 Mar 2015 16:48:57 -0000 Message-Id: <24429af71b8744ff90af9c65ffe97e55@git.apache.org> In-Reply-To: <92efcbe0e11a4bf082d9af54368a55a4@git.apache.org> References: <92efcbe0e11a4bf082d9af54368a55a4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/13] flink git commit: [FLINK-1350] [runtime] Add blocking result partition variant http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java deleted file mode 100644 index 174211a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition; - -import akka.actor.ActorRef; -import akka.dispatch.OnFailure; -import akka.pattern.Patterns; -import com.google.common.base.Optional; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor; -import org.apache.flink.runtime.execution.RuntimeEnvironment; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.partition.queue.IllegalQueueIteratorRequestException; -import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueue; -import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator; -import org.apache.flink.runtime.io.network.partition.queue.PipelinedPartitionQueue; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; -import org.apache.flink.runtime.messages.TaskManagerMessages.FailTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Future; - -import java.io.IOException; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -public class IntermediateResultPartition implements BufferPoolOwner { - - private static final Logger LOG = LoggerFactory.getLogger(IntermediateResultPartition.class); - - private final RuntimeEnvironment environment; - - /** - * Note: This index needs to correspond to the index of the partition in - * {@link ExecutionVertex#resultPartitions}, which might be a little - * fragile as the data availability notifications use it. - */ - private final int partitionIndex; - - private final JobID jobId; - - private final ExecutionAttemptID producerExecutionId; - - private final IntermediateResultPartitionID partitionId; - - private final IntermediateResultPartitionType partitionType; - - private final NetworkEnvironment networkEnvironment; - - private final IntermediateResultPartitionQueue[] queues; - - private volatile boolean hasNotifiedConsumers; - - private volatile boolean isReleased; - - private boolean isFinished; - - private BufferPool bufferPool; - - public IntermediateResultPartition(RuntimeEnvironment environment, int partitionIndex, JobID jobId, - ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, IntermediateResultPartitionType partitionType, - IntermediateResultPartitionQueue[] partitionQueues, NetworkEnvironment networkEnvironment) { - this.environment = environment; - this.partitionIndex = partitionIndex; - this.jobId = jobId; - this.producerExecutionId = executionId; - this.partitionId = partitionId; - this.partitionType = partitionType; - this.networkEnvironment = networkEnvironment; - this.queues = partitionQueues; - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - public void setBufferPool(BufferPool bufferPool) { - checkArgument(bufferPool.getNumberOfRequiredMemorySegments() == getNumberOfQueues(), - "Buffer pool has not enough buffers for this intermediate result."); - checkState(this.bufferPool == null, "Buffer pool has already been set for intermediate result partition."); - - this.bufferPool = checkNotNull(bufferPool); - } - - public ExecutionAttemptID getProducerExecutionId() { - return producerExecutionId; - } - - public IntermediateResultPartitionID getPartitionId() { - return partitionId; - } - - public JobID getJobId() { - return jobId; - } - - public int getNumberOfQueues() { - return queues.length; - } - - public BufferProvider getBufferProvider() { - return bufferPool; - } - - public boolean isFinished() { - return isFinished; - } - - // ------------------------------------------------------------------------ - // Produce - // ------------------------------------------------------------------------ - - public void add(Buffer buffer, int targetQueue) throws IOException { - synchronized (queues) { - if (isReleased) { - buffer.recycle(); - } - else { - checkInProducePhase(); - queues[targetQueue].add(buffer); - } - } - - maybeNotifyConsumers(partitionType.isPipelined()); - } - - public void finish() throws IOException { - boolean success = false; - - synchronized (queues) { - checkInProducePhase(); - - try { - if (!isReleased) { - for (IntermediateResultPartitionQueue queue : queues) { - queue.finish(); - } - - success = true; - } - } - finally { - isFinished = true; - } - } - - if (success) { - // Notify at this point in any case either because of the end - // of a blocking result or an empty pipelined result. - maybeNotifyConsumers(true); - - if (!partitionType.isPersistent() && bufferPool != null) { - // If this partition is not persistent, immediately destroy - // the buffer pool. For persistent intermediate results, the - // partition manager needs to release the buffer pool. - bufferPool.destroy(); - } - } - } - - public void releaseAllResources() throws IOException { - synchronized (queues) { - LOG.debug("Release all resources of {}.", this); - - if (!isReleased) { - try { - for (IntermediateResultPartitionQueue queue : queues) { - try { - queue.discard(); - } - catch (Throwable t) { - LOG.error("Error while discarding queue: " + t.getMessage(), t); - } - } - - if (bufferPool != null) { - bufferPool.destroy(); - } - } - finally { - isReleased = true; - } - } - } - } - - // ------------------------------------------------------------------------ - // Consume - // ------------------------------------------------------------------------ - - public IntermediateResultPartitionQueueIterator getQueueIterator(int queueIndex, Optional bufferProvider) - throws IOException { - synchronized (queues) { - if (isReleased) { - throw new IllegalQueueIteratorRequestException("Intermediate result partition has already been released."); - } - - if (queueIndex < 0 || queueIndex >= queues.length) { - throw new IllegalQueueIteratorRequestException("Illegal queue index: " + queueIndex + ", allowed: 0-" + (queues.length - 1)); - } - - return queues[queueIndex].getQueueIterator(bufferProvider); - } - } - - // ------------------------------------------------------------------------ - - private void checkInProducePhase() { - checkState(!isReleased, "Partition has already been discarded."); - checkState(!isFinished, "Partition has already been finished."); - } - - /** - * Maybe notifies consumers of this result partition. - */ - private void maybeNotifyConsumers(boolean doNotify) throws IOException { - if (doNotify && !hasNotifiedConsumers) { - scheduleOrUpdateConsumers(); - hasNotifiedConsumers = true; - } - } - - private void scheduleOrUpdateConsumers() throws IOException { - if(!isReleased){ - ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, - producerExecutionId, partitionIndex); - - Future futureResponse = Patterns.ask(networkEnvironment.getJobManager(), msg, - networkEnvironment.getJobManagerTimeout()); - - futureResponse.onFailure(new OnFailure(){ - @Override - public void onFailure(Throwable failure) throws Throwable { - LOG.error("Could not schedule or update consumers at the JobManager.", failure); - - // Fail task at the TaskManager - FailTask failMsg = new FailTask(producerExecutionId, - new RuntimeException("Could not schedule or update consumers at " + - "the JobManager.", failure)); - - networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender()); - } - }, AkkaUtils.globalExecutionContext()); - } - } - - // ------------------------------------------------------------------------ - // Buffer pool owner methods - // ------------------------------------------------------------------------ - - /** - * If this partition is registered as the {@link BufferPoolOwner} of a - * {@link BufferPool}, it will forward the requests to the queues. - */ - @Override - public void recycleBuffers(int numBuffersToRecycle) throws IOException { - int numRecycledBuffers = 0; - - for (IntermediateResultPartitionQueue queue : queues) { - numRecycledBuffers += queue.recycleBuffers(); - - if (numRecycledBuffers >= numBuffersToRecycle) { - break; - } - } - } - - // ------------------------------------------------------------------------ - - public static IntermediateResultPartition create(RuntimeEnvironment environment, int partitionIndex, JobID jobId, - ExecutionAttemptID executionId, NetworkEnvironment networkEnvironment, PartitionDeploymentDescriptor desc) { - final IntermediateResultPartitionID partitionId = checkNotNull(desc.getPartitionId()); - final IntermediateResultPartitionType partitionType = checkNotNull(desc.getPartitionType()); - - final IntermediateResultPartitionQueue[] partitionQueues = new IntermediateResultPartitionQueue[desc.getNumberOfQueues()]; - - // TODO The queues need to be created depending on the result type - for (int i = 0; i < partitionQueues.length; i++) { - partitionQueues[i] = new PipelinedPartitionQueue(); - } - - return new IntermediateResultPartition(environment, partitionIndex, jobId, executionId, partitionId, partitionType, - partitionQueues, networkEnvironment); - } - - @Override - public String toString() { - return String.format("IntermediateResultPartition(JobID: %s, ExecutionID: %s, " + - "PartitionID: %s, PartitionType: %s, [num queues: %d, (isFinished: %b)", - jobId, producerExecutionId, partitionId, partitionType, queues.length, isFinished); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java deleted file mode 100644 index d5b8fe5..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition; - -import com.google.common.base.Optional; -import com.google.common.collect.HashBasedTable; -import com.google.common.collect.Table; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.partition.queue.IllegalQueueIteratorRequestException; -import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * The intermediate result partition manager keeps track of all available - * partitions of a task manager and - */ -public class IntermediateResultPartitionManager implements IntermediateResultPartitionProvider { - - private static final Logger LOG = LoggerFactory.getLogger(IntermediateResultPartitionManager.class); - - private final Table partitions = HashBasedTable.create(); - - private boolean isShutdown; - - public void registerIntermediateResultPartition(IntermediateResultPartition partition) throws IOException { - synchronized (partitions) { - LOG.debug("Register intermediate result partition {}.", partition); - - if (isShutdown) { - throw new IOException("Intermediate result partition manager has already been shut down."); - } - - if (partitions.put(partition.getProducerExecutionId(), partition.getPartitionId(), partition) != null) { - throw new IOException("Tried to re-register intermediate result partition."); - } - } - } - - public void failIntermediateResultPartitions(ExecutionAttemptID producerExecutionId) { - synchronized (partitions) { - List partitionsToFail = new ArrayList(); - - for (IntermediateResultPartitionID partitionId : partitions.row(producerExecutionId).keySet()) { - partitionsToFail.add(partitions.get(producerExecutionId, partitionId)); - } - - for(IntermediateResultPartition partition : partitionsToFail) { - failIntermediateResultPartition(partition); - } - } - } - - private void failIntermediateResultPartition(IntermediateResultPartition partition) { - if (partition != null) { - try { - partition.releaseAllResources(); - } - catch (Throwable t) { - LOG.error("Error during release of produced intermediate result partition: " + t.getMessage(), t); - } - } - } - - public void shutdown() { - synchronized (partitions) { - for (IntermediateResultPartition partition : partitions.values()) { - try { - partition.releaseAllResources(); - } - catch (IOException e) { - LOG.error("Error while releasing intermediate result partition: " + e.getMessage(), e); - } - } - - isShutdown = true; - } - } - - public int getNumberOfRegisteredPartitions() { - synchronized (partitions) { - return partitions.size(); - } - } - - // ------------------------------------------------------------------------ - // Intermediate result partition provider - // ------------------------------------------------------------------------ - - @Override - public IntermediateResultPartitionQueueIterator getIntermediateResultPartitionIterator( - ExecutionAttemptID producerExecutionId, - IntermediateResultPartitionID partitionId, - int queueIndex, - Optional bufferProvider) throws IOException { - - synchronized (partitions) { - IntermediateResultPartition partition = partitions.get(producerExecutionId, partitionId); - - if (partition == null) { - if (!partitions.containsRow(producerExecutionId)) { - LOG.debug("Could not find producer execution ID {}. Registered producer" + - " execution IDs {}.", producerExecutionId, - Arrays.toString(partitions.rowKeySet().toArray())); - - throw new IllegalQueueIteratorRequestException("Unknown producer execution ID " + producerExecutionId + "."); - } - - throw new IllegalQueueIteratorRequestException("Unknown partition " + partitionId + "."); - } - - return partition.getQueueIterator(queueIndex, bufferProvider); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionProvider.java deleted file mode 100644 index b18b3fc..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionProvider.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition; - -import com.google.common.base.Optional; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; - -import java.io.IOException; - -public interface IntermediateResultPartitionProvider { - - IntermediateResultPartitionQueueIterator getIntermediateResultPartitionIterator( - ExecutionAttemptID producerExecutionId, - IntermediateResultPartitionID partitionId, - int requestedQueueIndex, - Optional bufferProvider) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java new file mode 100644 index 0000000..a8a0a7b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import com.google.common.base.Optional; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.util.event.NotificationListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A pipelined in-memory only subpartition, which can be consumed once. + */ +class PipelinedSubpartition extends ResultSubpartition { + + private static final Logger LOG = LoggerFactory.getLogger(PipelinedSubpartition.class); + + /** Flag indicating whether the subpartition has been finished. */ + private boolean isFinished; + + /** Flag indicating whether the subpartition has been released. */ + private boolean isReleased; + + /** + * A data availability listener. Registered, when the consuming task is faster than the + * producing task. + */ + private NotificationListener registeredListener; + + /** The read view to consume this subpartition. */ + private PipelinedSubpartitionView readView; + + /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */ + final ArrayDeque buffers = new ArrayDeque(); + + PipelinedSubpartition(int index, ResultPartition parent) { + super(index, parent); + } + + @Override + public boolean add(Buffer buffer) { + checkNotNull(buffer); + + final NotificationListener listener; + + synchronized (buffers) { + if (isReleased || isFinished) { + return false; + } + + // Add the buffer and update the stats + buffers.add(buffer); + updateStatistics(buffer); + + // Get the listener... + listener = registeredListener; + registeredListener = null; + } + + // Notify the listener outside of the synchronized block + if (listener != null) { + listener.onNotification(); + } + + return true; + } + + @Override + public void finish() { + final NotificationListener listener; + + synchronized (buffers) { + if (isReleased || isFinished) { + return; + } + + final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); + + buffers.add(buffer); + updateStatistics(buffer); + + isFinished = true; + + LOG.debug("Finished {}.", this); + + // Get the listener... + listener = registeredListener; + registeredListener = null; + } + + // Notify the listener outside of the synchronized block + if (listener != null) { + listener.onNotification(); + } + } + + @Override + public void release() { + final NotificationListener listener; + final PipelinedSubpartitionView view; + + synchronized (buffers) { + if (isReleased) { + return; + } + + // Release all available buffers + Buffer buffer; + while ((buffer = buffers.poll()) != null) { + if (!buffer.isRecycled()) { + buffer.recycle(); + } + } + + // Get the view... + view = readView; + readView = null; + + // Get the listener... + listener = registeredListener; + registeredListener = null; + + // Make sure that no further buffers are added to the subpartition + isReleased = true; + + LOG.debug("Released {}.", this); + } + + // Release all resources of the view + if (view != null) { + view.releaseAllResources(); + } + + // Notify the listener outside of the synchronized block + if (listener != null) { + listener.onNotification(); + } + } + + @Override + public int releaseMemory() { + // The pipelined subpartition does not react to memory release requests. The buffers will be + // recycled by the consuming task. + return 0; + } + + @Override + public PipelinedSubpartitionView getReadView(Optional bufferProvider) { + synchronized (buffers) { + if (readView != null) { + throw new IllegalStateException("Subpartition is being or already has been " + + "consumed, but pipelined subpartitions can only be consumed once."); + } + + readView = new PipelinedSubpartitionView(this); + + LOG.debug("Created {}.", readView); + + return readView; + } + } + + @Override + public String toString() { + synchronized (buffers) { + return String.format("PipelinedSubpartition [number of buffers: %d (%d bytes), " + + "finished? %s, read view? %s]", + getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null); + } + } + + /** + * Registers a listener with this subpartition and returns whether the registration was + * successful. + * + *

A registered listener is notified when the state of the subpartition changes. After a + * notification, the listener is unregistered. Only a single listener is allowed to be + * registered. + */ + boolean registerListener(NotificationListener listener) { + synchronized (buffers) { + if (!buffers.isEmpty() || isReleased) { + return false; + } + + if (registeredListener == null) { + registeredListener = listener; + + return true; + } + + throw new IllegalStateException("Already registered listener."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java new file mode 100644 index 0000000..822e33a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.util.event.NotificationListener; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * View over a pipelined in-memory only subpartition. + */ +class PipelinedSubpartitionView implements ResultSubpartitionView { + + /** The subpartition this view belongs to. */ + private final PipelinedSubpartition parent; + + /** Flag indicating whether this view has been released. */ + private AtomicBoolean isReleased = new AtomicBoolean(); + + PipelinedSubpartitionView(PipelinedSubpartition parent) { + this.parent = checkNotNull(parent); + } + + @Override + public Buffer getNextBuffer() { + synchronized (parent.buffers) { + return parent.buffers.poll(); + } + } + + @Override + public boolean registerListener(NotificationListener listener) { + return !isReleased.get() && parent.registerListener(listener); + + } + + @Override + public void notifySubpartitionConsumed() { + releaseAllResources(); + } + + @Override + public void releaseAllResources() { + if (isReleased.compareAndSet(false, true)) { + // The view doesn't hold any resources and the parent cannot be restarted. Therefore, + // it's OK to notify about consumption as well. + parent.onConsumedSubpartition(); + } + } + + @Override + public boolean isReleased() { + return isReleased.get(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java new file mode 100644 index 0000000..95aa636 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import akka.actor.ActorRef; +import akka.dispatch.OnFailure; +import akka.pattern.Patterns; +import com.google.common.base.Optional; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; +import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask; + +/** + * A result partition for data produced by a single task. + * + *

This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially, + * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one + * or more {@link ResultSubpartition} instances, which further partition the data depending on the + * number of consuming tasks and the data {@link DistributionPattern}. + * + *

Tasks, which consume a result partition have to request one of its subpartitions. The request + * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel}) + * + *

Life-cycle

+ * + * The life-cycle of each result partition has three (possibly overlapping) phases: + *
    + *
  1. Produce:
  2. + *
  3. Consume:
  4. + *
  5. Release:
  6. + *
+ * + *

Lazy deployment and updates of consuming tasks

+ * + * Before a consuming task can request the result, it has to be deployed. The time of deployment + * depends on the PIPELINED vs. BLOCKING characteristic of the result partition. With pipelined + * results, receivers are deployed as soon as the first buffer is added to the result partition. + * With blocking results on the other hand, receivers are deployed after the partition is finished. + * + *

Buffer management

+ * + *

State management

+ */ +public class ResultPartition implements BufferPoolOwner { + + private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class); + + private final JobID jobId; + + private final ResultPartitionID partitionId; + + /** Type of this partition. Defines the concrete subpartition implementation to use. */ + private final ResultPartitionType partitionType; + + /** The subpartitions of this partition. At least one. */ + private final ResultSubpartition[] subpartitions; + + private final NetworkEnvironment networkEnvironment; + + // - Runtime state -------------------------------------------------------- + + private final AtomicBoolean isReleased = new AtomicBoolean(); + + /** + * The total number of references to subpartitions of this result. The result partition can be + * safely released, iff the reference count is zero. A reference count of -1 denotes that the + * result partition has been released. + */ + private final AtomicInteger pendingReferences = new AtomicInteger(); + + private BufferPool bufferPool; + + private boolean hasNotifiedPipelinedConsumers; + + private boolean isFinished; + + // - Statistics ---------------------------------------------------------- + + /** The total number of buffers (both data and event buffers) */ + private int totalNumberOfBuffers; + + /** The total number of bytes (both data and event buffers) */ + private long totalNumberOfBytes; + + public ResultPartition( + JobID jobId, + ResultPartitionID partitionId, + ResultPartitionType partitionType, + int numberOfSubpartitions, + NetworkEnvironment networkEnvironment, + IOManager ioManager) { + + this.jobId = checkNotNull(jobId); + this.partitionId = checkNotNull(partitionId); + this.partitionType = checkNotNull(partitionType); + this.subpartitions = new ResultSubpartition[numberOfSubpartitions]; + this.networkEnvironment = checkNotNull(networkEnvironment); + + // Create the subpartitions. + switch (partitionType) { + case BLOCKING: + for (int i = 0; i < subpartitions.length; i++) { + subpartitions[i] = new SpillableSubpartition( + i, this, ioManager, networkEnvironment.getDefaultIOMode()); + } + + break; + + case PIPELINED: + for (int i = 0; i < subpartitions.length; i++) { + subpartitions[i] = new PipelinedSubpartition(i, this); + } + + break; + + default: + throw new IllegalArgumentException("Unsupported result partition type."); + } + + // Initially, partitions should be consumed once before release. + pin(); + + LOG.debug("Initialized {}", this); + } + + /** + * Registers a buffer pool with this result partition. + *

+ * There is one pool for each result partition, which is shared by all its sub partitions. + *

+ * The pool is registered with the partition *after* it as been constructed in order to conform + * to the life-cycle of task registrations in the {@link TaskManager}. + */ + public void registerBufferPool(BufferPool bufferPool) { + checkArgument(bufferPool.getNumberOfRequiredMemorySegments() == getNumberOfSubpartitions(), + "Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition."); + + checkState(this.bufferPool == null, "Bug in result partition setup logic: Already registered buffer pool."); + + this.bufferPool = checkNotNull(bufferPool); + + // If the partition type is back pressure-free, we register with the buffer pool for + // callbacks to release memory. + if (!partitionType.hasBackPressure()) { + bufferPool.setBufferPoolOwner(this); + } + } + + public JobID getJobId() { + return jobId; + } + + public ResultPartitionID getPartitionId() { + return partitionId; + } + + public int getNumberOfSubpartitions() { + return subpartitions.length; + } + + public BufferProvider getBufferProvider() { + return bufferPool; + } + + public int getTotalNumberOfBuffers() { + return totalNumberOfBuffers; + } + + public long getTotalNumberOfBytes() { + return totalNumberOfBytes; + } + + // ------------------------------------------------------------------------ + + /** + * Adds a buffer to the subpartition with the given index. + * + *

For PIPELINED results, this will trigger the deployment of consuming tasks after the + * first buffer has been added. + */ + public void add(Buffer buffer, int subpartitionIndex) throws IOException { + boolean success = false; + + try { + checkInProduceState(); + + final ResultSubpartition subpartition = subpartitions[subpartitionIndex]; + + synchronized (subpartition) { + success = subpartition.add(buffer); + + // Update statistics + totalNumberOfBuffers++; + totalNumberOfBytes += buffer.getSize(); + } + } + finally { + if (success) { + notifyPipelinedConsumers(); + } + else { + buffer.recycle(); + } + } + } + + /** + * Finishes the result partition. + * + *

After this operation, it is not possible to add further data to the result partition. + * + *

For BLOCKING results, this will trigger the deployment of consuming tasks. + */ + public void finish() throws IOException { + boolean success = false; + + try { + checkInProduceState(); + + for (ResultSubpartition subpartition : subpartitions) { + synchronized (subpartition) { + subpartition.finish(); + } + } + + success = true; + } + finally { + if (success) { + isFinished = true; + + notifyPipelinedConsumers(); + } + } + } + + /** + * Releases the result partition. + */ + public void release() { + if (isReleased.compareAndSet(false, true)) { + LOG.debug("Releasing {}", this); + + try { + for (ResultSubpartition subpartition : subpartitions) { + try { + synchronized (subpartition) { + subpartition.release(); + } + } + // Catch this in order to ensure that release is called on all subpartitions + catch (Throwable t) { + LOG.error("Error during release of result subpartition: " + t.getMessage(), t); + } + } + } + finally { + if (bufferPool != null) { + bufferPool.lazyDestroy(); + } + } + } + } + + /** + * Returns the requested subpartition. + */ + public ResultSubpartitionView getSubpartition(int index, Optional bufferProvider) throws IOException { + int refCnt = pendingReferences.get(); + + checkState(refCnt != -1, "Partition released."); + checkState(refCnt > 0, "Partition not pinned."); + + return subpartitions[index].getReadView(bufferProvider); + } + + /** + * Releases buffers held by this result partition. + * + *

This is a callback from the buffer pool, which is registered for result partitions, which + * are back pressure-free. + */ + @Override + public void releaseMemory(int toRelease) throws IOException { + checkArgument(toRelease > 0); + + for (ResultSubpartition subpartition : subpartitions) { + toRelease -= subpartition.releaseMemory(); + + // Only release as much memory as needed + if (toRelease <= 0) { + break; + } + } + } + + @Override + public String toString() { + return "ResultPartition " + partitionId.toString() + " [" + partitionType + ", " + + subpartitions.length + " subpartitions, " + + pendingReferences + " pending references]"; + } + + // ------------------------------------------------------------------------ + + /** + * Pins the result partition. + * + *

The partition can only be released after each subpartition has been consumed once per pin + * operation. + */ + void pin() { + while (true) { + int refCnt = pendingReferences.get(); + + if (refCnt >= 0) { + if (pendingReferences.compareAndSet(refCnt, refCnt + subpartitions.length)) { + break; + } + } + else { + throw new IllegalStateException("Released."); + } + } + } + + /** + * Notification when a subpartition is released. + */ + void onConsumedSubpartition(int subpartitionIndex) { + + if (isReleased.get()) { + return; + } + + int refCnt = pendingReferences.decrementAndGet(); + + if (refCnt == 0) { + networkEnvironment.getPartitionManager().onConsumedPartition(this); + } + else if (refCnt < 0) { + throw new IllegalStateException("All references released."); + } + + LOG.debug("{}: Received release notification for subpartition {} (reference count now at: {}).", + this, subpartitionIndex, pendingReferences); + } + + // ------------------------------------------------------------------------ + + private void checkInProduceState() { + checkState(!isFinished, "Partition already finished."); + } + + /** + * Notifies pipelined consumers of this result partition once. + */ + private void notifyPipelinedConsumers() throws IOException { + if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) { + ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId); + + Future futureResponse = Patterns.ask(networkEnvironment.getJobManager(), msg, + networkEnvironment.getJobManagerTimeout()); + + futureResponse.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + LOG.error("Could not schedule or update consumers at the JobManager.", failure); + + // Fail task at the TaskManager + FailTask failMsg = new FailTask(partitionId.getProducerId(), + new RuntimeException("Could not schedule or update consumers at " + + "the JobManager.", failure)); + + networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender()); + } + }, AkkaUtils.globalExecutionContext()); + + hasNotifiedPipelinedConsumers = true; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java new file mode 100644 index 0000000..af2970d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.io.Serializable; + +/** + * Runtime identifier of a produced {@link IntermediateResultPartition}. + * + *

In failure cases the {@link IntermediateResultPartitionID} is not enough to uniquely + * identify a result partition. It needs to be associated with the producing task as well to ensure + * correct tracking of failed/restarted tasks. + */ +public final class ResultPartitionID implements Serializable { + + private final IntermediateResultPartitionID partitionId; + + private final ExecutionAttemptID producerId; + + public ResultPartitionID() { + this(new IntermediateResultPartitionID(), new ExecutionAttemptID()); + } + + public ResultPartitionID(IntermediateResultPartitionID partitionId, ExecutionAttemptID producerId) { + this.partitionId = partitionId; + this.producerId = producerId; + } + + public IntermediateResultPartitionID getPartitionId() { + return partitionId; + } + + public ExecutionAttemptID getProducerId() { + return producerId; + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj.getClass() == ResultPartitionID.class) { + ResultPartitionID o = (ResultPartitionID) obj; + + return o.getPartitionId().equals(partitionId) && o.getProducerId().equals(producerId); + } + + return false; + } + + @Override + public int hashCode() { + return partitionId.hashCode() ^ producerId.hashCode(); + } + + @Override + public String toString() { + return partitionId.toShortString() + "@" + producerId.toShortString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java new file mode 100644 index 0000000..c120de8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import com.google.common.base.Optional; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Table; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkState; + +/** + * The result partition manager keeps track of all currently produced/consumed partitions of a + * task manager. + */ +public class ResultPartitionManager implements ResultPartitionProvider { + + private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class); + + public final Table + registeredPartitions = HashBasedTable.create(); + + private boolean isShutdown; + + public void registerIntermediateResultPartition(ResultPartition partition) throws IOException { + synchronized (registeredPartitions) { + checkState(!isShutdown, "Result partition manager already shut down."); + + ResultPartitionID partitionId = partition.getPartitionId(); + + ResultPartition previous = registeredPartitions.put( + partitionId.getProducerId(), partitionId.getPartitionId(), partition); + + if (previous != null) { + throw new IllegalStateException("Result partition already registered."); + } + + LOG.debug("Registered {}.", partition); + } + } + + @Override + public ResultSubpartitionView getSubpartition( + ResultPartitionID partitionId, + int subpartitionIndex, + Optional bufferProvider) throws IOException { + + synchronized (registeredPartitions) { + final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(), + partitionId.getPartitionId()); + + if (partition == null) { + throw new IOException("Unknown partition " + partitionId + "."); + } + + LOG.debug("Requested partition {}.", partition); + + return partition.getSubpartition(subpartitionIndex, bufferProvider); + } + } + + public void releasePartitionsProducedBy(ExecutionAttemptID executionId) { + synchronized (registeredPartitions) { + final Map partitions = + registeredPartitions.row(executionId); + + for (ResultPartition partition : partitions.values()) { + partition.release(); + } + + for (IntermediateResultPartitionID partitionId : ImmutableList + .copyOf(partitions.keySet())) { + + registeredPartitions.remove(executionId, partitionId); + } + + LOG.debug("Released all partitions produced by {}.", executionId); + } + } + + public void shutdown() { + synchronized (registeredPartitions) { + + LOG.debug("Releasing {} partitions because of shutdown.", + registeredPartitions.values().size()); + + for (ResultPartition partition : registeredPartitions.values()) { + partition.release(); + } + + registeredPartitions.clear(); + + isShutdown = true; + + LOG.debug("Successful shutdown."); + } + } + + // ------------------------------------------------------------------------ + // Notifications + // ------------------------------------------------------------------------ + + void onConsumedPartition(ResultPartition partition) { + final ResultPartition previous; + + LOG.debug("Received consume notification from {}.", partition); + + synchronized (registeredPartitions) { + ResultPartitionID partitionId = partition.getPartitionId(); + + previous = registeredPartitions.remove(partitionId.getProducerId(), + partitionId.getPartitionId()); + } + + // Release the partition if it was successfully removed + if (partition == previous) { + partition.release(); + + LOG.debug("Released {}.", partition); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java new file mode 100644 index 0000000..1f35f59 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import com.google.common.base.Optional; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; + +import java.io.IOException; + +public interface ResultPartitionProvider { + + /** + * Returns the requested intermediate result partition input view. + */ + ResultSubpartitionView getSubpartition(ResultPartitionID partitionId, int index, Optional bufferProvider) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java new file mode 100644 index 0000000..65d49ed --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +public enum ResultPartitionType { + + BLOCKING(true, false, false), + + PIPELINED(false, true, true), + + PIPELINED_PERSISTENT(true, true, true); + + /** Does the partition live longer than the consuming task? */ + private final boolean isPersistent; + + /** Can the partition be consumed while being produced? */ + private final boolean isPipelined; + + /** Does the partition produce back pressure when not consumed? */ + private final boolean hasBackPressure; + + /** + * Specifies the behaviour of an intermediate result partition at runtime. + */ + ResultPartitionType(boolean isPersistent, boolean isPipelined, boolean hasBackPressure) { + this.isPersistent = isPersistent; + this.isPipelined = isPipelined; + this.hasBackPressure = hasBackPressure; + } + + public boolean hasBackPressure() { + return hasBackPressure; + } + + public boolean isBlocking() { + return !isPipelined; + } + + public boolean isPipelined() { + return isPipelined; + } + + public boolean isPersistent() { + return isPersistent; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java new file mode 100644 index 0000000..1538a1a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import com.google.common.base.Optional; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; + +import java.io.IOException; + +/** + * A single subpartition of a {@link ResultPartition} instance. + */ +public abstract class ResultSubpartition { + + /** The index of the subpartition at the parent partition. */ + protected final int index; + + /** The parent partition this subpartition belongs to. */ + protected final ResultPartition parent; + + // - Statistics ---------------------------------------------------------- + + /** The total number of buffers (both data and event buffers) */ + private int totalNumberOfBuffers; + + /** The total number of bytes (both data and event buffers) */ + private long totalNumberOfBytes; + + public ResultSubpartition(int index, ResultPartition parent) { + this.index = index; + this.parent = parent; + } + + protected void updateStatistics(Buffer buffer) { + totalNumberOfBuffers++; + totalNumberOfBytes += buffer.getSize(); + } + + protected int getTotalNumberOfBuffers() { + return totalNumberOfBuffers; + } + + protected long getTotalNumberOfBytes() { + return totalNumberOfBytes; + } + + /** + * Notifies the parent partition about a consumed {@link ResultSubpartitionView}. + */ + protected void onConsumedSubpartition() { + parent.onConsumedSubpartition(index); + } + + abstract public boolean add(Buffer buffer) throws IOException; + + abstract public void finish() throws IOException; + + abstract public void release() throws IOException; + + abstract public ResultSubpartitionView getReadView(Optional bufferProvider) throws IOException; + + abstract int releaseMemory() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java new file mode 100644 index 0000000..82cee6c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.util.event.NotificationListener; + +import java.io.IOException; + +/** + * A view to consume a {@link ResultSubpartition} instance. + */ +public interface ResultSubpartitionView { + + /** + * Returns the next {@link Buffer} instance of this queue iterator. + *

+ * If there is currently no instance available, it will return null. + * This might happen for example when a pipelined queue producer is slower + * than the consumer or a spilled queue needs to read in more data. + *

+ * Important: The consumer has to make sure that each + * buffer instance will eventually be recycled with {@link Buffer#recycle()} + * after it has been consumed. + */ + Buffer getNextBuffer() throws IOException, InterruptedException; + + /** + * Subscribes to data availability notifications. + *

+ * Returns whether the subscription was successful. A subscription fails, + * if there is data available. + */ + boolean registerListener(NotificationListener listener) throws IOException; + + + void releaseAllResources() throws IOException; + + void notifySubpartitionConsumed() throws IOException; + + boolean isReleased(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java new file mode 100644 index 0000000..da6a847 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import com.google.common.base.Optional; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A blocking in-memory subpartition, which is able to spill to disk. + * + *

Buffers are kept in-memory as long as possible. If not possible anymore, all buffers are + * spilled to disk. + */ +class SpillableSubpartition extends ResultSubpartition { + + private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartition.class); + + /** All buffers of this subpartition. */ + final ArrayList buffers = new ArrayList(); + + /** The I/O manager to create the spill writer from. */ + final IOManager ioManager; + + /** The default I/O mode to use. */ + final IOMode ioMode; + + /** The writer used for spilling. As long as this is null, we are in-memory. */ + BufferFileWriter spillWriter; + + /** Flag indicating whether the subpartition has been finished. */ + private boolean isFinished; + + /** Flag indicating whether the subpartition has been released. */ + private boolean isReleased; + + /** The read view to consume this subpartition. */ + private ResultSubpartitionView readView; + + SpillableSubpartition(int index, ResultPartition parent, IOManager ioManager, IOMode ioMode) { + super(index, parent); + + this.ioManager = checkNotNull(ioManager); + this.ioMode = checkNotNull(ioMode); + } + + @Override + public boolean add(Buffer buffer) throws IOException { + checkNotNull(buffer); + + synchronized (buffers) { + if (isFinished || isReleased) { + return false; + } + + // In-memory + if (spillWriter == null) { + buffers.add(buffer); + + return true; + } + } + + // Else: Spilling + spillWriter.writeBlock(buffer); + + return true; + } + + @Override + public void finish() throws IOException { + synchronized (buffers) { + if (add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE))) { + // If we are spilling/have spilled, wait for the writer to finish. + if (spillWriter != null) { + spillWriter.close(); + } + + isFinished = true; + } + } + } + + @Override + public void release() throws IOException { + final ResultSubpartitionView view; + + synchronized (buffers) { + if (isReleased) { + return; + } + + // Recycle all in-memory buffers + for (Buffer buffer : buffers) { + buffer.recycle(); + } + + buffers.clear(); + buffers.trimToSize(); + + // If we are spilling/have spilled, wait for the writer to finish and delete the file. + if (spillWriter != null) { + spillWriter.closeAndDelete(); + } + + // Get the view... + view = readView; + readView = null; + + isReleased = true; + } + + // Release the view outside of the synchronized block + if (view != null) { + view.notifySubpartitionConsumed(); + } + } + + @Override + public int releaseMemory() throws IOException { + synchronized (buffers) { + if (spillWriter == null) { + // Create the spill writer + spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel()); + + final int numberOfBuffers = buffers.size(); + + // Spill all buffers + for (int i = 0; i < numberOfBuffers; i++) { + spillWriter.writeBlock(buffers.remove(0)); + } + + LOG.debug("Spilling {} buffers of {}.", numberOfBuffers, this); + + return numberOfBuffers; + } + } + + // Else: We have already spilled and don't hold any buffers + return 0; + } + + @Override + public ResultSubpartitionView getReadView(Optional bufferProvider) throws IOException { + synchronized (buffers) { + if (!isFinished) { + throw new IllegalStateException("Subpartition has not been finished yet, " + + "but blocking subpartitions can only be consumed after they have " + + "been finished."); + } + + if (readView != null) { + throw new IllegalStateException("Subpartition is being or already has been " + + "consumed, but we currently allow subpartitions to only be consumed once."); + } + + // Spilled if closed and no outstanding write requests + boolean isSpilled = spillWriter != null && (spillWriter.isClosed() + || spillWriter.getNumberOfOutstandingRequests() == 0); + + if (isSpilled) { + if (ioMode.isSynchronous()) { + readView = new SpilledSubpartitionViewSyncIO( + this, + bufferProvider.get().getMemorySegmentSize(), + spillWriter.getChannelID(), + 0); + } + else { + readView = new SpilledSubpartitionViewAsyncIO( + this, + bufferProvider.get(), + ioManager, + spillWriter.getChannelID(), + 0); + } + } + else { + readView = new SpillableSubpartitionView( + this, bufferProvider.get(), buffers.size(), ioMode); + } + + return readView; + } + } + + @Override + public String toString() { + return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," + + "finished? %s, read view? %s, spilled? %s]", + getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, + spillWriter != null); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java new file mode 100644 index 0000000..59b1464 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.util.event.NotificationListener; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +class SpillableSubpartitionView implements ResultSubpartitionView { + + /** The subpartition this view belongs to. */ + private final SpillableSubpartition parent; + + /** The buffer provider to read buffers into (spilling case). */ + private final BufferProvider bufferProvider; + + /** The number of buffers in-memory at the subpartition. */ + private final int numberOfBuffers; + + /** The default I/O mode to use. */ + private final IOMode ioMode; + + private ResultSubpartitionView spilledView; + + private int currentQueuePosition; + + private long currentBytesRead; + + private final AtomicBoolean isReleased = new AtomicBoolean(false); + + public SpillableSubpartitionView( + SpillableSubpartition parent, + BufferProvider bufferProvider, + int numberOfBuffers, + IOMode ioMode) { + + this.parent = checkNotNull(parent); + this.bufferProvider = checkNotNull(bufferProvider); + checkArgument(numberOfBuffers >= 0); + this.numberOfBuffers = numberOfBuffers; + this.ioMode = checkNotNull(ioMode); + } + + @Override + public Buffer getNextBuffer() throws IOException, InterruptedException { + + if (isReleased.get()) { + return null; + } + + // 1) In-memory + synchronized (parent.buffers) { + if (parent.spillWriter == null) { + if (currentQueuePosition < numberOfBuffers) { + Buffer buffer = parent.buffers.get(currentQueuePosition); + + buffer.retain(); + + // TODO Fix hard coding of 8 bytes for the header + currentBytesRead += buffer.getSize() + 8; + currentQueuePosition++; + + return buffer; + } + + return null; + } + } + + // 2) Spilled + if (spilledView != null) { + return spilledView.getNextBuffer(); + } + + // 3) Spilling + // Make sure that all buffers are written before consuming them. We can't block here, + // because this might be called from an network I/O thread. + if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) { + return null; + } + + if (ioMode.isSynchronous()) { + spilledView = new SpilledSubpartitionViewSyncIO( + parent, + bufferProvider.getMemorySegmentSize(), + parent.spillWriter.getChannelID(), + 0); + } + else { + spilledView = new SpilledSubpartitionViewAsyncIO( + parent, + bufferProvider, + parent.ioManager, + parent.spillWriter.getChannelID(), + 0); + } + + return spilledView.getNextBuffer(); + } + + @Override + public boolean registerListener(NotificationListener listener) throws IOException { + if (spilledView == null) { + synchronized (parent.buffers) { + // Didn't spill yet, buffers should be in-memory + if (parent.spillWriter == null) { + return false; + } + } + + // Spilling + if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) { + return parent.spillWriter.registerAllRequestsProcessedListener(listener); + } + + return false; + } + + return spilledView.registerListener(listener); + } + + @Override + public void notifySubpartitionConsumed() throws IOException { + parent.onConsumedSubpartition(); + } + + @Override + public void releaseAllResources() throws IOException { + if (isReleased.compareAndSet(false, true)) { + if (spilledView != null) { + spilledView.releaseAllResources(); + } + } + } + + @Override + public boolean isReleased() { + return isReleased.get(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java new file mode 100644 index 0000000..1d4a9ab --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.util.event.EventListener; +import org.apache.flink.runtime.util.event.NotificationListener; + +import java.io.IOException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * View over a spilled subpartition. + * + *

Reads are triggered asynchronously in batches of configurable size. + */ +class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView { + + private final static int DEFAULT_READ_BATCH_SIZE = 2; + + private final Object lock = new Object(); + + /** The subpartition this view belongs to. */ + private final ResultSubpartition parent; + + /** The buffer provider to get the buffer read everything into. */ + private final BufferProvider bufferProvider; + + /** The buffer availability listener to be notified on available buffers. */ + private final BufferProviderCallback bufferAvailabilityListener; + + /** The size of read batches. */ + private final int readBatchSize; + + /** + * The size of the current batch (>= 0 and <= the configured batch size). Reads are only + * triggered when the size of the current batch is 0. + */ + private final AtomicInteger currentBatchSize = new AtomicInteger(); + + /** The asynchronous file reader to do the actual I/O. */ + private final BufferFileReader asyncFileReader; + + /** The buffers, which have been returned from the file reader. */ + private final ConcurrentLinkedQueue returnedBuffers = new ConcurrentLinkedQueue(); + + /** A data availability listener. */ + private NotificationListener registeredListener; + + /** Error, which has occurred in the I/O thread. */ + private volatile IOException errorInIOThread; + + /** Flag indicating whether all resources have been released. */ + private volatile boolean isReleased; + + /** Flag indicating whether we reached EOF at the file reader. */ + private volatile boolean hasReachedEndOfFile; + + SpilledSubpartitionViewAsyncIO( + ResultSubpartition parent, + BufferProvider bufferProvider, + IOManager ioManager, + FileIOChannel.ID channelId, + long initialSeekPosition) throws IOException { + + this(parent, bufferProvider, ioManager, channelId, initialSeekPosition, DEFAULT_READ_BATCH_SIZE); + } + + SpilledSubpartitionViewAsyncIO( + ResultSubpartition parent, + BufferProvider bufferProvider, + IOManager ioManager, + FileIOChannel.ID channelId, + long initialSeekPosition, + int readBatchSize) throws IOException { + + checkArgument(initialSeekPosition >= 0, "Initial seek position is < 0."); + checkArgument(readBatchSize >= 1, "Batch read size < 1."); + + this.parent = checkNotNull(parent); + this.bufferProvider = checkNotNull(bufferProvider); + this.bufferAvailabilityListener = new BufferProviderCallback(this); + + this.asyncFileReader = ioManager.createBufferFileReader(channelId, new IOThreadCallback(this)); + + if (initialSeekPosition > 0) { + asyncFileReader.seekToPosition(initialSeekPosition); + } + + this.readBatchSize = readBatchSize; + + // Trigger the initial read requests + readNextBatchAsync(); + } + + @Override + public Buffer getNextBuffer() throws IOException { + checkError(); + + final Buffer buffer = returnedBuffers.poll(); + + // No buffer returned from the I/O thread currently. Either the current batch is in progress + // or we trigger the next one. + if (buffer == null) { + if (currentBatchSize.get() == 0) { + readNextBatchAsync(); + } + } + else { + currentBatchSize.decrementAndGet(); + } + + return buffer; + } + + @Override + public boolean registerListener(NotificationListener listener) throws IOException { + checkNotNull(listener); + + checkError(); + + synchronized (lock) { + if (isReleased || !returnedBuffers.isEmpty()) { + return false; + } + + if (registeredListener == null) { + registeredListener = listener; + + return true; + } + } + + throw new IllegalStateException("Already registered listener."); + } + + @Override + public void notifySubpartitionConsumed() throws IOException { + parent.onConsumedSubpartition(); + } + + @Override + public void releaseAllResources() throws IOException { + try { + synchronized (lock) { + if (!isReleased) { + // Recycle all buffers. Buffers, which are in flight are recycled as soon as + // they return from the I/O thread. + Buffer buffer; + while ((buffer = returnedBuffers.poll()) != null) { + buffer.recycle(); + } + + isReleased = true; + } + } + } + finally { + asyncFileReader.close(); + } + } + + @Override + public boolean isReleased() { + return isReleased; + } + + /** + * Requests buffers from the buffer provider and triggers asynchronous read requests to fill + * them. + * + *

The number of requested buffers/triggered I/O read requests per call depends on the + * configured size of batch reads. + */ + private void readNextBatchAsync() throws IOException { + // This does not need to be fully synchronized with actually reaching EOF as long as + // we eventually notice it. In the worst case, we trigger some discarded reads and + // notice it when the buffers are returned. + // + // We only trigger reads if the current batch size is 0. + if (hasReachedEndOfFile || currentBatchSize.get() != 0) { + return; + } + + // Number of successful buffer requests or callback registrations. The call back will + // trigger the read as soon as a buffer becomes available again. + int i = 0; + + while (i < readBatchSize) { + final Buffer buffer = bufferProvider.requestBuffer(); + + if (buffer == null) { + // Listen for buffer availability. + currentBatchSize.incrementAndGet(); + + if (bufferProvider.addListener(bufferAvailabilityListener)) { + i++; + } + else if (bufferProvider.isDestroyed()) { + currentBatchSize.decrementAndGet(); + return; + } + else { + // Buffer available again + currentBatchSize.decrementAndGet(); + } + } + else { + currentBatchSize.incrementAndGet(); + + asyncFileReader.readInto(buffer); + } + } + } + + /** + * Returns a buffer from the buffer provider. + * + *

Note: This method is called from the thread recycling the available buffer. + */ + private void onAvailableBuffer(Buffer buffer) { + try { + asyncFileReader.readInto(buffer); + } + catch (IOException e) { + notifyError(e); + } + } + + /** + * Returns a successful buffer read request. + * + *

Note: This method is always called from the same I/O thread. + */ + private void returnBufferFromIOThread(Buffer buffer) { + final NotificationListener listener; + + synchronized (lock) { + if (hasReachedEndOfFile || isReleased) { + buffer.recycle(); + + return; + } + + returnedBuffers.add(buffer); + + listener = registeredListener; + registeredListener = null; + + // If this was the last buffer before we reached EOF, set the corresponding flag to + // ensure that further buffers are correctly recycled and eventually no further reads + // are triggered. + if (asyncFileReader.hasReachedEndOfFile()) { + hasReachedEndOfFile = true; + } + } + + if (listener != null) { + listener.onNotification(); + } + } + + /** + * Notifies the view about an error. + */ + private void notifyError(IOException error) { + if (errorInIOThread == null) { + errorInIOThread = error; + } + + final NotificationListener listener; + + synchronized (lock) { + listener = registeredListener; + registeredListener = null; + } + + if (listener != null) { + listener.onNotification(); + } + } + + /** + * Checks whether an error has been reported and rethrow the respective Exception, if available. + */ + private void checkError() throws IOException { + if (errorInIOThread != null) { + throw errorInIOThread; + } + } + + /** + * Callback from the I/O thread. + * + *

Successful buffer read requests add the buffer to the subpartition view, and failed ones + * notify about the error. + */ + private static class IOThreadCallback implements RequestDoneCallback { + + private final SpilledSubpartitionViewAsyncIO subpartitionView; + + public IOThreadCallback(SpilledSubpartitionViewAsyncIO subpartitionView) { + this.subpartitionView = subpartitionView; + } + + @Override + public void requestSuccessful(Buffer buffer) { + subpartitionView.returnBufferFromIOThread(buffer); + } + + @Override + public void requestFailed(Buffer buffer, IOException error) { + // Recycle the buffer and forward the error + buffer.recycle(); + + subpartitionView.notifyError(error); + } + } + + /** + * Callback from the buffer provider. + */ + private static class BufferProviderCallback implements EventListener { + + private final SpilledSubpartitionViewAsyncIO subpartitionView; + + private BufferProviderCallback(SpilledSubpartitionViewAsyncIO subpartitionView) { + this.subpartitionView = subpartitionView; + } + + @Override + public void onEvent(Buffer buffer) { + if (buffer == null) { + return; + } + + subpartitionView.onAvailableBuffer(buffer); + } + } +}