http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 1f79067..7de8651 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -18,15 +18,15 @@
package org.apache.flink.runtime.io.disk.iomanager;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
/**
* A version of the {@link IOManager} that uses asynchronous I/O.
@@ -181,13 +181,13 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
{
- Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+ checkState(!shutdown, "I/O-Manger is closed.");
return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue);
}
@Override
- public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException {
- Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+ public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException {
+ checkState(!shutdown, "I/O-Manger is closed.");
return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback);
}
@@ -205,7 +205,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
{
- Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+ checkState(!shutdown, "I/O-Manger is closed.");
return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
}
@@ -228,7 +228,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
List<MemorySegment> targetSegments, int numBlocks) throws IOException
{
- Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+ checkState(!shutdown, "I/O-Manger is closed.");
return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
index 1b7adae..69791f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
@@ -33,7 +33,6 @@ interface IORequest {
public void requestDone(IOException ioex);
}
-
/**
* Interface for I/O requests that are handled by the IOManager's reading thread.
*/
@@ -47,7 +46,6 @@ interface ReadRequest extends IORequest {
public void read() throws IOException;
}
-
/**
* Interface for I/O requests that are handled by the IOManager's writing thread.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
index 78699e2..95f3dc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.MemorySegment;
/**
* A {@link RequestDoneCallback} that adds the memory segments to a blocking queue.
*/
-public class QueuingCallback implements RequestDoneCallback {
+public class QueuingCallback implements RequestDoneCallback<MemorySegment> {
private final LinkedBlockingQueue<MemorySegment> queue;
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
index 982343c..f9a0965 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
@@ -20,17 +20,15 @@ package org.apache.flink.runtime.io.disk.iomanager;
import java.io.IOException;
-import org.apache.flink.core.memory.MemorySegment;
-
/**
* Callback to be executed on completion of an asynchronous I/O request.
- * Depending on success or failure, either method of
- * {@ink #requestSuccessful(MemorySegment)} or {@link #requestFailed(MemorySegment, IOException)}
- * is called.
+ * <p>
+ * Depending on success or failure, either {@link #requestSuccessful(Object)}
+ * or {@link #requestSuccessful(Object)} is called.
*/
-public interface RequestDoneCallback {
+public interface RequestDoneCallback<T> {
+
+ void requestSuccessful(T request);
- void requestSuccessful(MemorySegment buffer);
-
- void requestFailed(MemorySegment buffer, IOException e);
+ void requestFailed(T buffer, IOException e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java
deleted file mode 100644
index 60232da..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.memory.MemorySegment;
-
-public class Buffer {
-
- private final MemorySegment memorySegment;
-
- private final BufferRecycler recycler;
-
- // -----------------------------------------------------------------------------------------------------------------
-
- private final AtomicInteger referenceCounter;
-
- private int size;
-
- // -----------------------------------------------------------------------------------------------------------------
-
- public Buffer(MemorySegment memorySegment, int size, BufferRecycler recycler) {
- this.memorySegment = memorySegment;
- this.size = size;
- this.recycler = recycler;
-
- // we are the first, so we start with reference count of one
- this.referenceCounter = new AtomicInteger(1);
- }
-
- /**
- * @param toDuplicate Buffer instance to duplicate
- */
- private Buffer(Buffer toDuplicate) {
- if (toDuplicate.referenceCounter.getAndIncrement() == 0) {
- throw new IllegalStateException("Buffer was released before duplication.");
- }
-
- this.memorySegment = toDuplicate.memorySegment;
- this.size = toDuplicate.size;
- this.recycler = toDuplicate.recycler;
- this.referenceCounter = toDuplicate.referenceCounter;
- }
-
- // -----------------------------------------------------------------------------------------------------------------
-
- public MemorySegment getMemorySegment() {
- return this.memorySegment;
- }
-
- public int size() {
- return this.size;
- }
-
- public void limitSize(int size) {
- if (size >= 0 && size <= this.memorySegment.size()) {
- this.size = size;
- } else {
- throw new IllegalArgumentException();
- }
- }
-
- public void recycleBuffer() {
- int refCount = this.referenceCounter.decrementAndGet();
- if (refCount == 0) {
- this.recycler.recycle(this.memorySegment);
- }
- }
-
- public Buffer duplicate() {
- return new Buffer(this);
- }
-
- public void copyToBuffer(Buffer destinationBuffer) {
- if (size() > destinationBuffer.size()) {
- throw new IllegalArgumentException("Destination buffer is too small to store content of source buffer.");
- }
-
- this.memorySegment.copyTo(0, destinationBuffer.memorySegment, 0, size);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java
deleted file mode 100644
index e48ca52..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.core.memory.MemorySegment;
-
-public interface BufferRecycler {
-
- /**
- * Called by {@link org.apache.flink.runtime.io.network.Buffer} to return a {@link MemorySegment} to its original buffer pool.
- *
- * @param buffer the segment to be recycled
- */
- void recycle(MemorySegment buffer);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
deleted file mode 100644
index c841872..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ /dev/null
@@ -1,692 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import akka.actor.ActorRef;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.AbstractID;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.RuntimeEnvironment;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
-import org.apache.flink.runtime.io.network.bufferprovider.DiscardBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
-import org.apache.flink.runtime.io.network.channels.Channel;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.io.network.channels.InputChannel;
-import org.apache.flink.runtime.io.network.channels.OutputChannel;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.io.network.gates.OutputGate;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.ExceptionUtils;
-import scala.concurrent.duration.FiniteDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * The channel manager sets up the network buffers and dispatches data between channels.
- */
-public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker {
-
- private static final Logger LOG = LoggerFactory.getLogger(ChannelManager.class);
-
- private final ActorRef channelLookup;
-
- private final InstanceConnectionInfo connectionInfo;
-
- private final Map<ChannelID, Channel> channels;
-
- private final Map<AbstractID, LocalBufferPoolOwner> localBuffersPools;
-
- private final Map<ChannelID, EnvelopeReceiverList> receiverCache;
-
- private final GlobalBufferPool globalBufferPool;
-
- private final NetworkConnectionManager networkConnectionManager;
-
- private final InetSocketAddress ourAddress;
-
- private final DiscardBufferPool discardBufferPool;
-
- private final FiniteDuration timeout;
-
- // -----------------------------------------------------------------------------------------------------------------
-
- public ChannelManager(ActorRef channelLookup, InstanceConnectionInfo connectionInfo, int numNetworkBuffers,
- int networkBufferSize, NetworkConnectionManager networkConnectionManager,
- FiniteDuration timeout) throws IOException {
-
- this.channelLookup= channelLookup;
- this.connectionInfo = connectionInfo;
-
- this.timeout = timeout;
-
- try {
- this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
- } catch (Throwable e) {
- throw new IOException("Failed to instantiate GlobalBufferPool.", e);
- }
-
- this.networkConnectionManager = networkConnectionManager;
- networkConnectionManager.start(this);
-
- // management data structures
- this.channels = new ConcurrentHashMap<ChannelID, Channel>();
- this.receiverCache = new ConcurrentHashMap<ChannelID, EnvelopeReceiverList>();
- this.localBuffersPools = new ConcurrentHashMap<AbstractID, LocalBufferPoolOwner>();
-
- this.ourAddress = new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort());
-
- // a special pool if the data is to be discarded
- this.discardBufferPool = new DiscardBufferPool();
- }
-
- public void shutdown() throws IOException {
- this.networkConnectionManager.shutdown();
-
- this.globalBufferPool.destroy();
- }
-
- public GlobalBufferPool getGlobalBufferPool() {
- return globalBufferPool;
- }
-
- public NetworkConnectionManager getNetworkConnectionManager() {
- return networkConnectionManager;
- }
-
- // -----------------------------------------------------------------------------------------------------------------
- // Task registration
- // -----------------------------------------------------------------------------------------------------------------
-
- /**
- * Registers the given task with the channel manager.
- *
- * @param task the task to be registered
- * @throws InsufficientResourcesException thrown if not enough buffers available to safely run this task
- */
- public void register(Task task) throws InsufficientResourcesException {
- // Check if we can safely run this task with the given buffers
- ensureBufferAvailability(task);
-
- RuntimeEnvironment environment = task.getEnvironment();
-
- // -------------------------------------------------------------------------------------------------------------
- // Register output channels
- // -------------------------------------------------------------------------------------------------------------
-
- environment.registerGlobalBufferPool(this.globalBufferPool);
-
- if (this.localBuffersPools.containsKey(task.getExecutionId())) {
- throw new IllegalStateException("Execution " + task.getExecutionId() + " has a previous buffer pool owner");
- }
-
- for (OutputGate gate : environment.outputGates()) {
- // add receiver list hints
- for (OutputChannel channel : gate.channels()) {
- // register envelope dispatcher with the channel
- channel.registerEnvelopeDispatcher(this);
-
- switch (channel.getChannelType()) {
- case IN_MEMORY:
- addReceiverListHint(channel.getID(), channel.getConnectedId());
- break;
- case NETWORK:
- addReceiverListHint(channel.getConnectedId(), channel.getID());
- break;
- }
-
- this.channels.put(channel.getID(), channel);
- }
- }
-
- this.localBuffersPools.put(task.getExecutionId(), environment);
-
- // -------------------------------------------------------------------------------------------------------------
- // Register input channels
- // -------------------------------------------------------------------------------------------------------------
-
- // register global
- for (InputGate<?> gate : environment.inputGates()) {
- gate.registerGlobalBufferPool(this.globalBufferPool);
-
- for (int i = 0; i < gate.getNumberOfInputChannels(); i++) {
- InputChannel<? extends IOReadableWritable> channel = gate.getInputChannel(i);
- channel.registerEnvelopeDispatcher(this);
-
- if (channel.getChannelType() == ChannelType.IN_MEMORY) {
- addReceiverListHint(channel.getID(), channel.getConnectedId());
- }
-
- this.channels.put(channel.getID(), channel);
- }
-
- this.localBuffersPools.put(gate.getGateID(), gate);
- }
-
- // the number of channels per buffers has changed after unregistering the task
- // => redistribute the number of designated buffers of the registered local buffer pools
- redistributeBuffers();
- }
-
- /**
- * Unregisters the given task from the channel manager.
- *
- * @param executionId the ID of the task to be unregistered
- * @param task the task to be unregistered
- */
- public void unregister(ExecutionAttemptID executionId, Task task) {
- final Environment environment = task.getEnvironment();
- if (environment == null) {
- return;
- }
-
- // destroy and remove OUTPUT channels from registered channels and cache
- for (ChannelID id : environment.getOutputChannelIDs()) {
- Channel channel = this.channels.remove(id);
- if (channel != null) {
-
- channel.destroy();
-
- removeFromReceiverCacheAndMaybeCloseTcpConnection(channel);
- }
- }
-
- // destroy and remove INPUT channels from registered channels and cache
- for (ChannelID id : environment.getInputChannelIDs()) {
- Channel channel = this.channels.remove(id);
- if (channel != null) {
- channel.destroy();
-
- removeFromReceiverCacheAndMaybeCloseTcpConnection(channel);
- }
- }
-
- // clear and remove INPUT side buffer pools
- for (GateID id : environment.getInputGateIDs()) {
- LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(id);
- if (bufferPool != null) {
- bufferPool.clearLocalBufferPool();
- }
- }
-
- // clear and remove OUTPUT side buffer pool
- LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(executionId);
- if (bufferPool != null) {
- bufferPool.clearLocalBufferPool();
- }
-
- // the number of channels per buffers has changed after unregistering the task
- // => redistribute the number of designated buffers of the registered local buffer pools
- redistributeBuffers();
- }
-
- private void removeFromReceiverCacheAndMaybeCloseTcpConnection(Channel channel) {
- EnvelopeReceiverList receiver = this.receiverCache.remove(channel.getID());
-
- if (receiver != null && receiver.hasRemoteReceiver()) {
- networkConnectionManager.close(receiver.getRemoteReceiver());
- }
- }
-
- /**
- * Ensures that the channel manager has enough buffers to execute the given task.
- * <p>
- * If there is less than one buffer per channel available, an InsufficientResourcesException will be thrown,
- * because of possible deadlocks. With more then one buffer per channel, deadlock-freedom is guaranteed.
- *
- * @param task task to be executed
- * @throws InsufficientResourcesException thrown if not enough buffers available to execute the task
- */
- private void ensureBufferAvailability(Task task) throws InsufficientResourcesException {
- Environment env = task.getEnvironment();
-
- int numBuffers = this.globalBufferPool.numBuffers();
- // existing channels + channels of the task
- int numChannels = this.channels.size() + env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
-
- // need at least one buffer per channel
- if (numChannels > 0 && numBuffers / numChannels < 1) {
- String msg = String.format("%s has not enough buffers to safely execute %s (%d buffers missing)",
- this.connectionInfo.getFQDNHostname(), env.getTaskName(), numChannels - numBuffers);
-
- throw new InsufficientResourcesException(msg);
- }
- }
-
- /**
- * Redistributes the buffers among the registered buffer pools. This method is called after each task registration
- * and unregistration.
- * <p>
- * Every registered buffer pool gets buffers according to its number of channels weighted by the current buffer to
- * channel ratio.
- */
- private void redistributeBuffers() {
- if (this.localBuffersPools.isEmpty() | this.channels.size() == 0) {
- return;
- }
-
- int numBuffers = this.globalBufferPool.numBuffers();
- int numChannels = this.channels.size();
-
- double buffersPerChannel = numBuffers / (double) numChannels;
-
- if (buffersPerChannel < 1.0) {
- throw new RuntimeException("System has not enough buffers to execute tasks.");
- }
-
- // redistribute number of designated buffers per buffer pool
- for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) {
- int numDesignatedBuffers = (int) Math.ceil(buffersPerChannel * bufferPool.getNumberOfChannels());
- bufferPool.setDesignatedNumberOfBuffers(numDesignatedBuffers);
- }
- }
-
- // -----------------------------------------------------------------------------------------------------------------
- // Envelope processing
- // -----------------------------------------------------------------------------------------------------------------
-
- private void releaseEnvelope(Envelope envelope) {
- Buffer buffer = envelope.getBuffer();
- if (buffer != null) {
- buffer.recycleBuffer();
- }
- }
-
- private void addReceiverListHint(ChannelID source, ChannelID localReceiver) {
- EnvelopeReceiverList receiverList = new EnvelopeReceiverList(localReceiver);
-
- if (this.receiverCache.put(source, receiverList) != null) {
- LOG.warn("Receiver cache already contained entry for " + source);
- }
- }
-
- private void addReceiverListHint(ChannelID source, RemoteReceiver remoteReceiver) {
- EnvelopeReceiverList receiverList = new EnvelopeReceiverList(remoteReceiver);
-
- if (this.receiverCache.put(source, receiverList) != null) {
- LOG.warn("Receiver cache already contained entry for " + source);
- }
- }
-
- private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) throws IOException {
- Channel channel = this.channels.get(envelope.getSource());
- if (channel == null) {
- LOG.error("Cannot find channel for channel ID " + envelope.getSource());
- return;
- }
-
- // Only generate sender hints for output channels
- if (channel.isInputChannel()) {
- return;
- }
-
- final ChannelID targetChannelID = channel.getConnectedId();
- final int connectionIndex = receiver.getConnectionIndex();
-
- final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex);
- final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);
-
- this.networkConnectionManager.enqueue(senderHint, receiver, true);
- }
-
- /**
- * Returns the list of receivers for transfer envelopes produced by the channel with the given source channel ID.
- *
- * @param jobID
- * the ID of the job the given channel ID belongs to
- * @param sourceChannelID
- * the source channel ID for which the receiver list shall be retrieved
- * @return the list of receivers or <code>null</code> if the receiver could not be determined
- * @throws IOException
- */
- private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID sourceChannelID, boolean reportException) throws IOException {
- EnvelopeReceiverList receiverList = this.receiverCache.get(sourceChannelID);
-
- if (receiverList != null) {
- return receiverList;
- }
-
- while (true) {
- ConnectionInfoLookupResponse lookupResponse;
- lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
- new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
- sourceChannelID), timeout).response();
-
- if (lookupResponse.receiverReady()) {
- receiverList = new EnvelopeReceiverList(lookupResponse);
- break;
- }
- else if (lookupResponse.receiverNotReady()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- if (reportException) {
- throw new IOException("Lookup was interrupted.");
- } else {
- return null;
- }
- }
- }
- else if (lookupResponse.isJobAborting()) {
- if (reportException) {
- throw new CancelTaskException();
- } else {
- return null;
- }
- }
- else if (lookupResponse.receiverNotFound()) {
- if (reportException) {
- throw new IOException("Could not find the receiver for Job " + jobID + ", channel with source id " + sourceChannelID);
- } else {
- return null;
- }
- }
- else {
- throw new IllegalStateException("Unrecognized response to channel lookup.");
- }
- }
-
- this.receiverCache.put(sourceChannelID, receiverList);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Receiver for %s: %s [%s])",
- sourceChannelID,
- receiverList.hasLocalReceiver() ? receiverList.getLocalReceiver() : receiverList.getRemoteReceiver(),
- receiverList.hasLocalReceiver() ? "local" : "remote"));
- }
-
- return receiverList;
- }
-
- /**
- * Invalidates the entries identified by the given channel IDs from the receiver lookup cache.
- *
- * @param channelIDs channel IDs for entries to invalidate
- */
- public void invalidateLookupCacheEntries(Set<ChannelID> channelIDs) {
- for (ChannelID id : channelIDs) {
- this.receiverCache.remove(id);
- }
- }
-
- // -----------------------------------------------------------------------------------------------------------------
- // EnvelopeDispatcher methods
- // -----------------------------------------------------------------------------------------------------------------
-
- @Override
- public void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException {
- EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, true);
-
- Buffer srcBuffer = envelope.getBuffer();
- Buffer destBuffer = null;
-
- boolean success = false;
-
- try {
- if (receiverList.hasLocalReceiver()) {
- ChannelID receiver = receiverList.getLocalReceiver();
- Channel channel = this.channels.get(receiver);
-
- if (channel == null) {
- throw new LocalReceiverCancelledException(receiver);
- }
-
- if (!channel.isInputChannel()) {
- throw new IOException("Local receiver " + receiver + " is not an input channel.");
- }
-
- InputChannel<?> inputChannel = (InputChannel<?>) channel;
-
- // copy the buffer into the memory space of the receiver
- if (srcBuffer != null) {
- try {
- destBuffer = inputChannel.requestBufferBlocking(srcBuffer.size());
- } catch (InterruptedException e) {
- throw new IOException(e.getMessage());
- }
-
- srcBuffer.copyToBuffer(destBuffer);
- envelope.setBuffer(destBuffer);
- srcBuffer.recycleBuffer();
- }
-
- inputChannel.queueEnvelope(envelope);
- success = true;
- }
- else if (receiverList.hasRemoteReceiver()) {
- RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
-
- // Generate sender hint before sending the first envelope over the network
- if (envelope.getSequenceNumber() == 0) {
- generateSenderHint(envelope, remoteReceiver);
- }
-
- this.networkConnectionManager.enqueue(envelope, remoteReceiver, false);
- success = true;
- }
- } finally {
- if (!success) {
- if (srcBuffer != null) {
- srcBuffer.recycleBuffer();
- }
- if (destBuffer != null) {
- destBuffer.recycleBuffer();
- }
- }
- }
- }
-
- @Override
- public void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException {
- // this method sends only events back from input channels to output channels
- // sanity check that we have no buffer
- if (envelope.getBuffer() != null) {
- throw new RuntimeException("Error: This method can only process envelopes without buffers.");
- }
-
- EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, true);
-
- if (receiverList.hasLocalReceiver()) {
- ChannelID receiver = receiverList.getLocalReceiver();
- Channel channel = this.channels.get(receiver);
-
- if (channel == null) {
- throw new LocalReceiverCancelledException(receiver);
- }
-
- if (channel.isInputChannel()) {
- throw new IOException("Local receiver " + receiver + " of backward event is not an output channel.");
- }
-
- OutputChannel outputChannel = (OutputChannel) channel;
- outputChannel.queueEnvelope(envelope);
- }
- else if (receiverList.hasRemoteReceiver()) {
- RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
-
- // Generate sender hint before sending the first envelope over the network
- this.networkConnectionManager.enqueue(envelope, remoteReceiver, envelope.getSequenceNumber() == 0);
- }
- }
-
- /**
- *
- */
- @Override
- public void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException {
- // ========================================================================================
- // IMPORTANT
- //
- // This method is called by the network I/O thread that reads the incoming TCP
- // connections. This method must have minimal overhead and not throw exception if
- // something is wrong with a job or individual transmission, but only when something
- // is fundamentally broken in the system.
- // ========================================================================================
-
- // the sender hint event is to let the receiver know where exactly the envelope came from.
- // the receiver will cache the sender id and its connection info in its local lookup table
- // that allows the receiver to send envelopes to the sender without first pinging the job manager
- // for the sender's connection info
-
- // Check if the envelope is the special envelope with the sender hint event
- if (SenderHintEvent.isSenderHintEvent(envelope)) {
- // Check if this is the final destination of the sender hint event before adding it
- final SenderHintEvent seh = (SenderHintEvent) envelope.deserializeEvents().get(0);
- if (this.channels.get(seh.getSource()) != null) {
- addReceiverListHint(seh.getSource(), seh.getRemoteReceiver());
- return;
- }
- }
-
- // try and get the receiver list. if we cannot get it anymore, the task has been cleared
- // the code frees the envelope on exception, so we need not to anything
- EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, false);
- if (receiverList == null) {
- // receiver is cancelled and cleaned away
- releaseEnvelope(envelope);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Dropping envelope for cleaned up receiver.");
- }
-
- return;
- }
-
- if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
- throw new IOException("Bug in network stack: Envelope dispatched from the incoming network pipe has no local receiver or has a remote receiver");
- }
-
- ChannelID localReceiver = receiverList.getLocalReceiver();
- Channel channel = this.channels.get(localReceiver);
-
- // if the channel is null, it means that receiver has been cleared already (cancelled or failed).
- // release the buffer immediately
- if (channel == null) {
- releaseEnvelope(envelope);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Dropping envelope for cancelled receiver " + localReceiver);
- }
- }
- else {
- channel.queueEnvelope(envelope);
- }
- }
-
- /**
- *
- * Upon an exception, this method frees the envelope.
- */
- private final EnvelopeReceiverList getReceiverListForEnvelope(Envelope envelope, boolean reportException) throws IOException {
- try {
- return getReceiverList(envelope.getJobID(), envelope.getSource(), reportException);
- } catch (IOException e) {
- releaseEnvelope(envelope);
- throw e;
- } catch (CancelTaskException e) {
- releaseEnvelope(envelope);
- throw e;
- } catch (Throwable t) {
- releaseEnvelope(envelope);
- ExceptionUtils.rethrow(t, "Error while requesting receiver list.");
- return null; // silence the compiler
- }
- }
-
- // -----------------------------------------------------------------------------------------------------------------
- // BufferProviderBroker methods
- // -----------------------------------------------------------------------------------------------------------------
-
- @Override
- public BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) throws IOException {
- EnvelopeReceiverList receiverList = getReceiverList(jobID, sourceChannelID, false);
-
- // check if the receiver is already gone
- if (receiverList == null) {
- return this.discardBufferPool;
- }
-
- if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
- throw new IOException("The destination to be looked up is not a single local endpoint.");
- }
-
-
- ChannelID localReceiver = receiverList.getLocalReceiver();
- Channel channel = this.channels.get(localReceiver);
-
- if (channel == null) {
- // receiver is already canceled
- return this.discardBufferPool;
- }
-
- if (!channel.isInputChannel()) {
- throw new IOException("Channel context for local receiver " + localReceiver + " is not an input channel context");
- }
-
- return (InputChannel<?>) channel;
- }
-
- // -----------------------------------------------------------------------------------------------------------------
-
- public void logBufferUtilization() {
- System.out.println("Buffer utilization at " + System.currentTimeMillis());
-
- System.out.println("\tUnused global buffers: " + this.globalBufferPool.numAvailableBuffers());
-
- System.out.println("\tLocal buffer pool status:");
-
- for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) {
- bufferPool.logBufferUtilization();
- }
-
- System.out.println("\tIncoming connections:");
-
- for (Channel channel : this.channels.values()) {
- if (channel.isInputChannel()) {
- ((InputChannel<?>) channel).logQueuedEnvelopes();
- }
- }
- }
-
- public void verifyAllCachesEmpty() {
- if (!channels.isEmpty()) {
- throw new IllegalStateException("Channel manager caches not empty: There are still registered channels.");
- }
- if (!localBuffersPools.isEmpty()) {
- throw new IllegalStateException("Channel manager caches not empty: There are still local buffer pools.");
- }
- if (!receiverCache.isEmpty()) {
- throw new IllegalStateException("Channel manager caches not empty: There are still entries in the receiver cache.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
deleted file mode 100644
index 51a0f94..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.Serializable;
-
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-public class ConnectionInfoLookupResponse implements Serializable {
-
- private static final long serialVersionUID = 3961171754642077522L;
-
-
- private enum ReturnCode {
- NOT_FOUND, FOUND_AND_RECEIVER_READY, FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING
- };
-
- // was request successful?
- private ReturnCode returnCode;
-
- private RemoteReceiver remoteTarget;
-
- private ChannelID localTarget;
-
- public ConnectionInfoLookupResponse() {}
-
- public ConnectionInfoLookupResponse(ReturnCode code) {
- this.returnCode = code;
- this.remoteTarget = null;
- this.localTarget = null;
- }
-
- public ConnectionInfoLookupResponse(ReturnCode code, ChannelID localTarget) {
- this.returnCode = code;
- this.remoteTarget = null;
- this.localTarget = localTarget;
- }
-
- public ConnectionInfoLookupResponse(ReturnCode code, RemoteReceiver receiver) {
- this.returnCode = code;
- this.remoteTarget = receiver;
- this.localTarget = null;
- }
-
- public RemoteReceiver getRemoteTarget() {
- return this.remoteTarget;
- }
-
- public ChannelID getLocalTarget() {
- return this.localTarget;
- }
-
- public boolean receiverNotFound() {
- return (this.returnCode == ReturnCode.NOT_FOUND);
- }
-
- public boolean receiverNotReady() {
- return (this.returnCode == ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
- }
-
- public boolean receiverReady() {
- return (this.returnCode == ReturnCode.FOUND_AND_RECEIVER_READY);
- }
-
- public boolean isJobAborting() {
- return (this.returnCode == ReturnCode.JOB_IS_ABORTING);
- }
-
-
- public static ConnectionInfoLookupResponse createReceiverFoundAndReady(ChannelID targetChannelID) {
- return new ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, targetChannelID);
- }
-
- public static ConnectionInfoLookupResponse createReceiverFoundAndReady(RemoteReceiver remoteReceiver) {
- return new ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, remoteReceiver);
- }
-
- public static ConnectionInfoLookupResponse createReceiverNotFound() {
- return new ConnectionInfoLookupResponse(ReturnCode.NOT_FOUND);
- }
-
- public static ConnectionInfoLookupResponse createReceiverNotReady() {
- return new ConnectionInfoLookupResponse(ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
- }
-
- public static ConnectionInfoLookupResponse createJobIsAborting() {
- return new ConnectionInfoLookupResponse(ReturnCode.JOB_IS_ABORTING);
- }
-
-
- @Override
- public String toString() {
- return this.returnCode.name() + ", local target: " + this.localTarget + ", remoteTarget: " + this.remoteTarget;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
new file mode 100644
index 0000000..4a5536b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
+
+import java.io.IOException;
+
+/**
+ * The connection manager manages physical connections for the (logical) remote
+ * input channels at runtime.
+ */
+public interface ConnectionManager {
+
+ void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException;
+
+ /**
+ * Creates a {@link PartitionRequestClient} instance for the given {@link RemoteAddress}.
+ */
+ PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException;
+
+ int getNumberOfActiveConnections();
+
+ void shutdown() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java
deleted file mode 100644
index 53f403c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.serialization.DataInputDeserializer;
-import org.apache.flink.runtime.io.network.serialization.DataOutputSerializer;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class Envelope {
-
- private final JobID jobID;
-
- private final ChannelID source;
-
- private final int sequenceNumber;
-
- private ByteBuffer serializedEventList;
-
- private Buffer buffer;
-
- public Envelope(int sequenceNumber, JobID jobID, ChannelID source) {
- this.sequenceNumber = sequenceNumber;
- this.jobID = jobID;
- this.source = source;
- }
-
- private Envelope(Envelope toDuplicate) {
- this.jobID = toDuplicate.jobID;
- this.source = toDuplicate.source;
- this.sequenceNumber = toDuplicate.sequenceNumber;
- this.serializedEventList = null;
- this.buffer = null;
- }
-
- public Envelope duplicate() {
- Envelope duplicate = new Envelope(this);
- if (hasBuffer()) {
- duplicate.setBuffer(this.buffer.duplicate());
- }
-
- return duplicate;
- }
-
- public Envelope duplicateWithoutBuffer() {
- return new Envelope(this);
- }
-
- public JobID getJobID() {
- return this.jobID;
- }
-
- public ChannelID getSource() {
- return this.source;
- }
-
- public int getSequenceNumber() {
- return this.sequenceNumber;
- }
-
- public void setEventsSerialized(ByteBuffer serializedEventList) {
- if (this.serializedEventList != null) {
- throw new IllegalStateException("Event list has already been set.");
- }
-
- this.serializedEventList = serializedEventList;
- }
-
- public void serializeEventList(List<? extends AbstractEvent> eventList) {
- if (this.serializedEventList != null) {
- throw new IllegalStateException("Event list has already been set.");
- }
-
- this.serializedEventList = serializeEvents(eventList);
- }
-
- public ByteBuffer getEventsSerialized() {
- return this.serializedEventList;
- }
-
- public List<? extends AbstractEvent> deserializeEvents() {
- return deserializeEvents(getClass().getClassLoader());
- }
-
- public List<? extends AbstractEvent> deserializeEvents(ClassLoader classloader) {
- if (this.serializedEventList == null) {
- return Collections.emptyList();
- }
-
- try {
- DataInputDeserializer deserializer = new DataInputDeserializer(this.serializedEventList);
-
- int numEvents = deserializer.readInt();
- ArrayList<AbstractEvent> events = new ArrayList<AbstractEvent>(numEvents);
-
- for (int i = 0; i < numEvents; i++) {
- String className = deserializer.readUTF();
- Class<? extends AbstractEvent> clazz;
- try {
- clazz = Class.forName(className).asSubclass(AbstractEvent.class);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Could not load event class '" + className + "'.", e);
- } catch (ClassCastException e) {
- throw new RuntimeException("The class '" + className + "' is no valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
- }
-
- AbstractEvent evt = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
- evt.read(deserializer);
-
- events.add(evt);
- }
-
- return events;
- }
- catch (IOException e) {
- throw new RuntimeException("Error while deserializing the events.", e);
- }
- }
-
- public void setBuffer(Buffer buffer) {
- this.buffer = buffer;
- }
-
- public Buffer getBuffer() {
- return this.buffer;
- }
-
- private ByteBuffer serializeEvents(List<? extends AbstractEvent> events) {
- try {
- // create the serialized event list
- DataOutputSerializer serializer = events.size() == 0
- ? new DataOutputSerializer(4)
- : new DataOutputSerializer(events.size() * 32);
- serializer.writeInt(events.size());
-
- for (AbstractEvent evt : events) {
- serializer.writeUTF(evt.getClass().getName());
- evt.write(serializer);
- }
-
- return serializer.wrapAsByteBuffer();
- }
- catch (IOException e) {
- throw new RuntimeException("Error while serializing the task events.", e);
- }
- }
-
- public boolean hasBuffer() {
- return this.buffer != null;
- }
-
- @Override
- public String toString() {
- return String.format("Envelope %d [source id: %s, buffer size: %d, events size: %d]",
- this.sequenceNumber, this.getSource(), this.buffer == null ? -1 : this.buffer.size(),
- this.serializedEventList == null ? -1 : this.serializedEventList.remaining());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java
deleted file mode 100644
index 59cf491..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-
-/**
- * A envelope dispatcher receives {@link Envelope}s and sends them to all of its destinations.
- */
-public interface EnvelopeDispatcher {
-
- /**
- * Dispatches an envelope from an output channel to the receiving input channels (forward flow).
- *
- * @param envelope envelope to be sent
- */
- void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException;
-
- /**
- * Dispatches an envelope from an input channel to the receiving output channels (backwards flow).
- *
- * @param envelope envelope to be sent
- */
- void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException;
-
- /**
- * Dispatches an envelope from an incoming TCP connection.
- * <p>
- * After an envelope has been constructed from a TCP socket, this method is called to send the envelope to the
- * receiving input channel.
- *
- * @param envelope envelope to be sent
- */
- void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java
deleted file mode 100644
index f6e4982..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.net.InetAddress;
-
-import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-/**
- * A transfer envelope receiver list contains all recipients of a transfer envelope. Their are three different types of
- * receivers: Local receivers identified by {@link ChannelID} objects, remote receivers identified by
- * {@link InetAddress} objects and finally checkpoints which are identified by
- * <p>
- * This class is thread-safe.
- *
- */
-public class EnvelopeReceiverList {
-
- private final ChannelID localReceiver;
-
- private final RemoteReceiver remoteReceiver;
-
- public EnvelopeReceiverList(ConnectionInfoLookupResponse cilr) {
- this.localReceiver = cilr.getLocalTarget();
- this.remoteReceiver = cilr.getRemoteTarget();
- }
-
- public EnvelopeReceiverList(ChannelID localReceiver) {
- this.localReceiver = localReceiver;
- this.remoteReceiver = null;
- }
-
- public EnvelopeReceiverList(RemoteReceiver remoteReceiver) {
- this.localReceiver = null;
- this.remoteReceiver = remoteReceiver;
- }
-
- public boolean hasLocalReceiver() {
- return this.localReceiver != null;
- }
-
- public boolean hasRemoteReceiver() {
- return this.remoteReceiver != null;
- }
-
- public int getTotalNumberOfReceivers() {
- return (this.localReceiver == null ? 0 : 1) + (this.remoteReceiver == null ? 0 : 1);
- }
-
- public RemoteReceiver getRemoteReceiver() {
- return this.remoteReceiver;
- }
-
- public ChannelID getLocalReceiver() {
- return this.localReceiver;
- }
-
- @Override
- public String toString() {
- return "local receiver: " + this.localReceiver + ", remote receiver: " + this.remoteReceiver;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java
deleted file mode 100644
index df4f4ec..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-/**
- * This exception is thrown by the {@link ChannelManager} to indicate that a task cannot be accepted because
- * there are not enough resources available to safely execute it.
- *
- */
-public final class InsufficientResourcesException extends Exception {
-
- /**
- * The generated serial version UID.
- */
- private static final long serialVersionUID = -8977049569413215169L;
-
- /**
- * Constructs a new insufficient resources exception.
- *
- * @param msg
- * the message describing the exception
- */
- InsufficientResourcesException(final String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 264fde6..894db35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -18,21 +18,24 @@
package org.apache.flink.runtime.io.network;
-import java.io.IOException;
+import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
-public class LocalConnectionManager implements NetworkConnectionManager {
+import java.io.IOException;
- @Override
- public void start(ChannelManager channelManager) throws IOException {
- }
+/**
+ * A connection manager implementation to bypass setup overhead for task managers running in local
+ * execution mode.
+ */
+public class LocalConnectionManager implements ConnectionManager {
@Override
- public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean isFirstEnvelope) throws IOException {
+ public void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
}
@Override
- public void close(RemoteReceiver receiver) {
-
+ public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java
deleted file mode 100644
index 84d2d80..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-
-/**
- *
- */
-public class LocalReceiverCancelledException extends CancelTaskException {
- private static final long serialVersionUID = 1L;
-
- private final ChannelID receiver;
-
- public LocalReceiverCancelledException(ChannelID receiver) {
- this.receiver = receiver;
- }
-
-
- public ChannelID getReceiver() {
- return receiver;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
deleted file mode 100644
index 309c92d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-
-public interface NetworkConnectionManager {
-
- public void start(ChannelManager channelManager) throws IOException;
-
- public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean isFirstEnvelope) throws IOException;
-
- public void close(RemoteReceiver receiver);
-
- public int getNumberOfActiveConnections();
-
- public void shutdown() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
new file mode 100644
index 0000000..aa6c64c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.api.reader.BufferReader;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Network I/O components of each {@link TaskManager} instance.
+ */
+public class NetworkEnvironment {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
+
+ private final ActorRef jobManager;
+
+ private final FiniteDuration jobManagerTimeout;
+
+ private final IntermediateResultPartitionManager partitionManager;
+
+ private final TaskEventDispatcher taskEventDispatcher;
+
+ private final NetworkBufferPool networkBufferPool;
+
+ private final ConnectionManager connectionManager;
+
+ private boolean isShutdown;
+
+ /**
+ * Initializes all network I/O components.
+ */
+ public NetworkEnvironment(ActorRef jobManager, FiniteDuration jobManagerTimeout, NetworkEnvironmentConfiguration config) throws IOException {
+ this.jobManager = checkNotNull(jobManager);
+ this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
+
+ this.partitionManager = new IntermediateResultPartitionManager();
+ this.taskEventDispatcher = new TaskEventDispatcher();
+
+ // --------------------------------------------------------------------
+ // Network buffers
+ // --------------------------------------------------------------------
+ try {
+ networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
+ }
+ catch (Throwable t) {
+ throw new IOException("Failed to instantiate network buffer pool: " + t.getMessage(), t);
+ }
+
+ // --------------------------------------------------------------------
+ // Network connections
+ // --------------------------------------------------------------------
+ final Option<NettyConfig> nettyConfig = config.nettyConfig();
+
+ connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get()) : new LocalConnectionManager();
+
+ try {
+ connectionManager.start(partitionManager, taskEventDispatcher);
+ }
+ catch (Throwable t) {
+ throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
+ }
+ }
+
+ public ActorRef getJobManager() {
+ return jobManager;
+ }
+
+ public FiniteDuration getJobManagerTimeout() {
+ return jobManagerTimeout;
+ }
+
+ public void registerTask(Task task) throws IOException {
+ final ExecutionAttemptID executionId = task.getExecutionId();
+
+ final IntermediateResultPartition[] producedPartitions = task.getProducedPartitions();
+ final BufferWriter[] writers = task.getWriters();
+
+ if (writers.length != producedPartitions.length) {
+ throw new IllegalStateException("Unequal number of writers and partitions.");
+ }
+
+ for (int i = 0; i < producedPartitions.length; i++) {
+ final IntermediateResultPartition partition = producedPartitions[i];
+ final BufferWriter writer = writers[i];
+
+ // Buffer pool for the partition
+ BufferPool bufferPool = null;
+
+ try {
+ bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfQueues(), false);
+ partition.setBufferPool(bufferPool);
+ partitionManager.registerIntermediateResultPartition(partition);
+ }
+ catch (Throwable t) {
+ if (bufferPool != null) {
+ bufferPool.destroy();
+ }
+
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ else {
+ throw new IOException(t.getMessage(), t);
+ }
+ }
+
+ // Register writer with task event dispatcher
+ taskEventDispatcher.registerWriterForIncomingTaskEvents(executionId, writer.getPartitionId(), writer);
+ }
+
+ // Setup the buffer pool for each buffer reader
+ final BufferReader[] readers = task.getReaders();
+
+ for (BufferReader reader : readers) {
+ BufferPool bufferPool = null;
+
+ try {
+ bufferPool = networkBufferPool.createBufferPool(reader.getNumberOfInputChannels(), false);
+ reader.setBufferPool(bufferPool);
+ }
+ catch (Throwable t) {
+ if (bufferPool != null) {
+ bufferPool.destroy();
+ }
+
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ else {
+ throw new IOException(t.getMessage(), t);
+ }
+ }
+ }
+ }
+
+ public void unregisterTask(Task task) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unregistering task {} ({}) from network environment (state: {}).", task.getTaskNameWithSubtasks(), task.getExecutionId(), task.getExecutionState());
+ }
+
+ final ExecutionAttemptID executionId = task.getExecutionId();
+
+ if (task.isCanceledOrFailed()) {
+ partitionManager.failIntermediateResultPartitions(executionId);
+ }
+
+ taskEventDispatcher.unregisterWriters(executionId);
+
+ final BufferReader[] readers = task.getReaders();
+
+ if (readers != null) {
+ for (BufferReader reader : readers) {
+ try {
+ if (reader != null) {
+ reader.releaseAllResources();
+ }
+ }
+ catch (IOException e) {
+ LOG.error("Error during release of reader resources: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ public IntermediateResultPartitionManager getPartitionManager() {
+ return partitionManager;
+ }
+
+ public TaskEventDispatcher getTaskEventDispatcher() {
+ return taskEventDispatcher;
+ }
+
+ public ConnectionManager getConnectionManager() {
+ return connectionManager;
+ }
+
+ public NetworkBufferPool getNetworkBufferPool() {
+ return networkBufferPool;
+ }
+
+ public boolean hasReleasedAllResources() {
+ String msg = String.format("Network buffer pool: %d missing memory segments. %d registered buffer pools. Connection manager: %d active connections. Task event dispatcher: %d registered writers.",
+ networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(), networkBufferPool.getNumberOfRegisteredBufferPools(), connectionManager.getNumberOfActiveConnections(), taskEventDispatcher.getNumberOfRegisteredWriters());
+
+ boolean success = networkBufferPool.getTotalNumberOfMemorySegments() == networkBufferPool.getNumberOfAvailableMemorySegments() &&
+ networkBufferPool.getNumberOfRegisteredBufferPools() == 0 &&
+ connectionManager.getNumberOfActiveConnections() == 0 &&
+ taskEventDispatcher.getNumberOfRegisteredWriters() == 0;
+
+ if (success) {
+ String successMsg = "Network environment did release all resources: " + msg;
+ LOG.debug(successMsg);
+ }
+ else {
+ String errMsg = "Network environment did *not* release all resources: " + msg;
+
+ LOG.error(errMsg);
+ }
+
+ return success;
+ }
+
+ /**
+ * Tries to shut down all network I/O components.
+ */
+ public void shutdown() {
+ if (!isShutdown) {
+ try {
+ if (networkBufferPool != null) {
+ networkBufferPool.destroy();
+ }
+ }
+ catch (Throwable t) {
+ LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t);
+ }
+
+ if (partitionManager != null) {
+ try {
+ partitionManager.shutdown();
+ }
+ catch (Throwable t) {
+ LOG.warn("Partition manager did not shut down properly: " + t.getMessage(), t);
+ }
+ }
+
+ try {
+ if (connectionManager != null) {
+ connectionManager.shutdown();
+ }
+ }
+ catch (Throwable t) {
+ LOG.warn("Network connection manager did not shut down properly: " + t.getMessage(), t);
+ }
+
+ isShutdown = true;
+ }
+ }
+
+ public boolean isShutdown() {
+ return isShutdown;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
new file mode 100644
index 0000000..937055b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link RemoteAddress} identifies a connection to a remote task manager by
+ * the socket address and a connection index. This allows multiple connections
+ * to be distinguished by their connection index.
+ * <p>
+ * The connection index is assigned by the {@link IntermediateResult} and
+ * ensures that it is safe to multiplex multiple data transfers over the same
+ * physical TCP connection.
+ */
+public class RemoteAddress implements IOReadableWritable, Serializable {
+
+ private InetSocketAddress address;
+
+ private int connectionIndex;
+
+ public RemoteAddress(InstanceConnectionInfo connectionInfo, int connectionIndex) {
+ this(new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex);
+ }
+
+ public RemoteAddress(InetSocketAddress address, int connectionIndex) {
+ this.address = checkNotNull(address);
+ checkArgument(connectionIndex >= 0);
+ this.connectionIndex = connectionIndex;
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ public int getConnectionIndex() {
+ return connectionIndex;
+ }
+
+ @Override
+ public int hashCode() {
+ return address.hashCode() + (31 * connectionIndex);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other.getClass() != RemoteAddress.class) {
+ return false;
+ }
+
+ final RemoteAddress ra = (RemoteAddress) other;
+ if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return address + " [" + connectionIndex + "]";
+ }
+
+ // ------------------------------------------------------------------------
+ // Serialization
+ // ------------------------------------------------------------------------
+
+ public RemoteAddress() {
+ this.address = null;
+ this.connectionIndex = -1;
+ }
+
+ @Override
+ public void write(final DataOutputView out) throws IOException {
+ final InetAddress ia = address.getAddress();
+ out.writeInt(ia.getAddress().length);
+ out.write(ia.getAddress());
+ out.writeInt(address.getPort());
+
+ out.writeInt(connectionIndex);
+ }
+
+ @Override
+ public void read(final DataInputView in) throws IOException {
+ final byte[] addressBytes = new byte[in.readInt()];
+ in.readFully(addressBytes);
+
+ final InetAddress ia = InetAddress.getByAddress(addressBytes);
+ int port = in.readInt();
+
+ address = new InetSocketAddress(ia, port);
+ connectionIndex = in.readInt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
deleted file mode 100644
index 436d07d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Objects of this class uniquely identify a connection to a remote {@link org.apache.flink.runtime.taskmanager.TaskManager}.
- */
-public final class RemoteReceiver implements IOReadableWritable, Serializable {
-
- private static final long serialVersionUID = 4304924747853162443L;
-
- /**
- * The address of the connection to the remote TaskManager.
- */
- private InetSocketAddress connectionAddress;
-
- /**
- * The index of the connection to the remote TaskManager.
- */
- private int connectionIndex;
-
- /**
- * Constructs a new remote receiver object.
- *
- * @param connectionAddress
- * the address of the connection to the remote TaskManager
- * @param connectionIndex
- * the index of the connection to the remote TaskManager
- */
- public RemoteReceiver(final InetSocketAddress connectionAddress, final int connectionIndex) {
- if (connectionAddress == null) {
- throw new IllegalArgumentException("Argument connectionAddress must not be null");
- }
- if (connectionIndex < 0) {
- throw new IllegalArgumentException("Argument connectionIndex must be a non-negative integer number");
- }
-
- this.connectionAddress = connectionAddress;
- this.connectionIndex = connectionIndex;
- }
-
- /**
- * Default constructor for serialization/deserialization.
- */
- public RemoteReceiver() {
- this.connectionAddress = null;
- this.connectionIndex = -1;
- }
-
- /**
- * Returns the address of the connection to the remote TaskManager.
- *
- * @return the address of the connection to the remote TaskManager
- */
- public InetSocketAddress getConnectionAddress() {
- return this.connectionAddress;
- }
-
- /**
- * Returns the index of the connection to the remote TaskManager.
- *
- * @return the index of the connection to the remote TaskManager
- */
- public int getConnectionIndex() {
- return this.connectionIndex;
- }
-
-
- @Override
- public int hashCode() {
- return this.connectionAddress.hashCode() + (31 * this.connectionIndex);
- }
-
-
- @Override
- public boolean equals(final Object obj) {
-
- if (!(obj instanceof RemoteReceiver)) {
- return false;
- }
-
- final RemoteReceiver rr = (RemoteReceiver) obj;
- if (!this.connectionAddress.equals(rr.connectionAddress)) {
- return false;
- }
-
- if (this.connectionIndex != rr.connectionIndex) {
- return false;
- }
-
- return true;
- }
-
-
- @Override
- public void write(final DataOutputView out) throws IOException {
-
- final InetAddress ia = this.connectionAddress.getAddress();
- out.writeInt(ia.getAddress().length);
- out.write(ia.getAddress());
- out.writeInt(this.connectionAddress.getPort());
-
- out.writeInt(this.connectionIndex);
- }
-
-
- @Override
- public void read(final DataInputView in) throws IOException {
- final int addr_length = in.readInt();
- final byte[] address = new byte[addr_length];
- in.readFully(address);
-
- InetAddress ia = InetAddress.getByAddress(address);
- int port = in.readInt();
- this.connectionAddress = new InetSocketAddress(ia, port);
-
- this.connectionIndex = in.readInt();
- }
-
-
- @Override
- public String toString() {
- return this.connectionAddress + " (" + this.connectionIndex + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java
deleted file mode 100644
index 7cdbabc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-public final class SenderHintEvent extends AbstractEvent {
-
- /**
- * The sequence number that will be set for transfer envelopes which contain the sender hint event.
- */
- private static final int SENDER_HINT_SEQUENCE_NUMBER = 0;
-
- private final ChannelID source;
-
- private final RemoteReceiver remoteReceiver;
-
- SenderHintEvent(final ChannelID source, final RemoteReceiver remoteReceiver) {
-
- if (source == null) {
- throw new IllegalArgumentException("Argument source must not be null");
- }
-
- if (remoteReceiver == null) {
- throw new IllegalArgumentException("Argument remoteReceiver must not be null");
- }
-
- this.source = source;
- this.remoteReceiver = remoteReceiver;
- }
-
- public SenderHintEvent() {
-
- this.source = new ChannelID();
- this.remoteReceiver = new RemoteReceiver();
- }
-
- public ChannelID getSource() {
-
- return this.source;
- }
-
- public RemoteReceiver getRemoteReceiver() {
-
- return this.remoteReceiver;
- }
-
-
- @Override
- public void write(final DataOutputView out) throws IOException {
-
- this.source.write(out);
- this.remoteReceiver.write(out);
- }
-
-
- @Override
- public void read(final DataInputView in) throws IOException {
-
- this.source.read(in);
- this.remoteReceiver.read(in);
- }
-
- public static Envelope createEnvelopeWithEvent(final Envelope originalEnvelope,
- final ChannelID source, final RemoteReceiver remoteReceiver) {
-
- final Envelope envelope = new Envelope(SENDER_HINT_SEQUENCE_NUMBER,
- originalEnvelope.getJobID(), originalEnvelope.getSource());
-
- final SenderHintEvent senderEvent = new SenderHintEvent(source, remoteReceiver);
- envelope.serializeEventList(Arrays.asList(senderEvent));
-
- return envelope;
- }
-
- static boolean isSenderHintEvent(final Envelope envelope) {
-
- if (envelope.getSequenceNumber() != SENDER_HINT_SEQUENCE_NUMBER) {
- return false;
- }
-
- if (envelope.getBuffer() != null) {
- return false;
- }
-
- List<? extends AbstractEvent> events = envelope.deserializeEvents();
-
- if (events.size() != 1) {
- return false;
- }
-
- if (!(events.get(0) instanceof SenderHintEvent)) {
- return false;
- }
-
- return true;
- }
-}
|