flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [11/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:19 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
deleted file mode 100644
index 3bca0a4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.gates;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
-import org.apache.flink.runtime.io.network.channels.InputChannel;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-/**
- * Input gates are a specialization of general gates and connect input channels and record readers. As
- * channels, input gates are always parameterized to a specific type of record which they can transport. In contrast to
- * output gates input gates can be associated with a {@link DistributionPattern} object which dictates the concrete
- * wiring between two groups of vertices.
- * 
- * @param <T> The type of record that can be transported through this gate.
- */
-public class InputGate<T extends IOReadableWritable> extends Gate<T> implements BufferProvider, LocalBufferPoolOwner {
-	
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(InputGate.class);
-
-	/**
-	 * The array of input channels attached to this input gate.
-	 */
-	private InputChannel<T>[] channels;
-
-	/**
-	 * Queue with indices of channels that store at least one available record.
-	 */
-	private final BlockingQueue<Integer> availableChannels = new LinkedBlockingQueue<Integer>();
-
-	/**
-	 * The listener object to be notified when a channel has at least one record available.
-	 */
-	private final AtomicReference<RecordAvailabilityListener<T>> recordAvailabilityListener = new AtomicReference<RecordAvailabilityListener<T>>(null);
-	
-	
-	private AbstractTaskEvent currentEvent;
-
-	/**
-	 * If the value of this variable is set to <code>true</code>, the input gate is closed.
-	 */
-	private boolean isClosed = false;
-
-	/**
-	 * The channel to read from next.
-	 */
-	private int channelToReadFrom = -1;
-
-	private LocalBufferPool bufferPool;
-
-	/**
-	 * Constructs a new runtime input gate.
-	 * 
-	 * @param jobID
-	 *        the ID of the job this input gate belongs to
-	 * @param gateID
-	 *        the ID of the gate
-	 * @param index
-	 *        the index assigned to this input gate at the {@link Environment} object
-	 */
-	public InputGate(final JobID jobID, final GateID gateID, final int index) {
-		super(jobID, gateID, index);
-	}
-
-	@SuppressWarnings("unchecked")
-	public void initializeChannels(GateDeploymentDescriptor inputGateDescriptor){
-		List<ChannelDeploymentDescriptor> channelDescr = inputGateDescriptor.getChannels();
-		
-		channels = new InputChannel[channelDescr.size()];
-
-		for(int i = 0; i < channelDescr.size(); i++){
-			ChannelDeploymentDescriptor cdd = channelDescr.get(i);
-			channels[i] = new InputChannel<T>(this, i, cdd.getInputChannelID(),
-					cdd.getOutputChannelID(), getChannelType());
-		}
-	}
-
-	@Override
-	public boolean isInputGate() {
-		return true;
-	}
-
-	/**
-	 * Returns the number of input channels associated with this input gate.
-	 *
-	 * @return the number of input channels associated with this input gate
-	 */
-	public int getNumberOfInputChannels() {
-		return this.channels.length;
-	}
-
-	/**
-	 * Returns the input channel from position <code>pos</code> of the gate's internal channel list.
-	 *
-	 * @param pos
-	 *        the position to retrieve the channel from
-	 * @return the channel from the given position or <code>null</code> if such position does not exist.
-	 */
-	public InputChannel<T> getInputChannel(int pos) {
-		return this.channels[pos];
-	}
-
-	public InputChannel<T>[] channels() {
-		return this.channels;
-	}
-
-	/**
-	 * Reads a record from one of the associated input channels. Channels are read such that one buffer from a channel is
-	 * consecutively consumed. The buffers in turn are consumed in the order in which they arrive.
-	 * Note that this method is not guaranteed to return a record, because the currently available channel data may not always
-	 * constitute an entire record, when events or partial records are part of the data.
-	 *
-	 * When called even though no data is available, this call will block until data is available, so this method should be called
-	 * when waiting is desired (such as when synchronously consuming a single gate) or only when it is known that data is available
-	 * (such as when reading a union of multiple input gates).
-	 *
-	 * @param target The record object into which to construct the complete record.
-	 * @return The result indicating whether a complete record is available, a event is available, only incomplete data
-	 *         is available (NONE), or the gate is exhausted.
-	 * @throws IOException Thrown when an error occurred in the network stack relating to this channel.
-	 * @throws InterruptedException Thrown, when the thread working on this channel is interrupted.
-	 */
-	public InputChannelResult readRecord(T target) throws IOException, InterruptedException {
-
-		if (this.channelToReadFrom == -1) {
-			if (this.isClosed()) {
-				return InputChannelResult.END_OF_STREAM;
-			}
-				
-			if (Thread.interrupted()) {
-				throw new InterruptedException();
-			}
-				
-			this.channelToReadFrom = waitForAnyChannelToBecomeAvailable();
-		}
-			
-		InputChannelResult result = this.getInputChannel(this.channelToReadFrom).readRecord(target);
-		switch (result) {
-			case INTERMEDIATE_RECORD_FROM_BUFFER: // full record and we can stay on the same channel
-				return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-				
-			case LAST_RECORD_FROM_BUFFER: // full record, but we must switch the channel afterwards
-				this.channelToReadFrom = -1;
-				return InputChannelResult.LAST_RECORD_FROM_BUFFER;
-				
-			case END_OF_SUPERSTEP:
-				this.channelToReadFrom = -1;
-				return InputChannelResult.END_OF_SUPERSTEP;
-				
-			case TASK_EVENT: // task event
-				this.currentEvent = this.getInputChannel(this.channelToReadFrom).getCurrentEvent();
-				this.channelToReadFrom = -1;	// event always marks a unit as consumed
-				return InputChannelResult.TASK_EVENT;
-					
-			case NONE: // internal event or an incomplete record that needs further chunks
-				// the current unit is exhausted
-				this.channelToReadFrom = -1;
-				return InputChannelResult.NONE;
-				
-			case END_OF_STREAM: // channel is done
-				this.channelToReadFrom = -1;
-				return isClosed() ? InputChannelResult.END_OF_STREAM : InputChannelResult.NONE;
-				
-			default:   // silence the compiler
-				throw new RuntimeException();
-		}
-	}
-
-	public AbstractTaskEvent getCurrentEvent() {
-		AbstractTaskEvent e = this.currentEvent;
-		this.currentEvent = null;
-		return e;
-	}
-
-	/**
-	 * Notify the gate that the channel with the given index has
-	 * at least one record available.
-	 *
-	 * @param channelIndex
-	 *        the index of the channel which has at least one record available
-	 */
-	public void notifyRecordIsAvailable(int channelIndex) {
-		this.availableChannels.add(Integer.valueOf(channelIndex));
-
-		RecordAvailabilityListener<T> listener = this.recordAvailabilityListener.get();
-		if (listener != null) {
-			listener.reportRecordAvailability(this);
-		}
-	}
-
-	/**
-	 * This method returns the index of a channel which has at least
-	 * one record available. The method may block until at least one
-	 * channel has become ready.
-	 * 
-	 * @return the index of the channel which has at least one record available
-	 */
-	public int waitForAnyChannelToBecomeAvailable() throws InterruptedException {
-		return this.availableChannels.take().intValue();
-	}
-
-
-	@Override
-	public boolean isClosed() throws IOException, InterruptedException {
-
-		if (this.isClosed) {
-			return true;
-		}
-
-		for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
-			final InputChannel<T> inputChannel = this.channels[i];
-			if (!inputChannel.isClosed()) {
-				return false;
-			}
-		}
-
-		this.isClosed = true;
-		
-		return true;
-	}
-
-
-	/**
-	 * Immediately closes the input gate and all its input channels. The corresponding
-	 * output channels are notified. Any remaining records in any buffers or queue is considered
-	 * irrelevant and is discarded.
-	 *
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while closing the gate
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the gate to be closed
-	 */
-	public void close() throws IOException, InterruptedException {
-
-		for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
-			final InputChannel<T> inputChannel = this.channels[i];
-			inputChannel.close();
-		}
-
-	}
-
-
-	@Override
-	public String toString() {
-		return "Input " + super.toString();
-	}
-
-
-	@Override
-	public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
-
-		// Copy event to all connected channels
-		for(int i=0; i< getNumberOfChannels(); i++){
-			channels[i].transferEvent(event);
-		}
-	}
-
-
-	@Override
-	public void releaseAllChannelResources() {
-
-		for(int i=0; i< getNumberOfChannels(); i++){
-			channels[i].releaseAllResources();
-		}
-	}
-
-	/**
-	 * Registers a {@link RecordAvailabilityListener} with this input gate.
-	 *
-	 * @param listener
-	 *        the listener object to be registered
-	 */
-	public void registerRecordAvailabilityListener(final RecordAvailabilityListener<T> listener) {
-		if (!this.recordAvailabilityListener.compareAndSet(null, listener)) {
-			throw new IllegalStateException(this.recordAvailabilityListener
-				+ " is already registered as a record availability listener");
-		}
-	}
-
-	/**
-	 * Notify the gate that is has consumed a data unit from the channel with the given index
-	 *
-	 * @param channelIndex
-	 *        the index of the channel from which a data unit has been consumed
-	 */
-	public void notifyDataUnitConsumed(int channelIndex) {
-		this.channelToReadFrom = -1;
-	}
-
-	//
-
-	@Override
-	public Buffer requestBuffer(int minBufferSize) throws IOException {
-		return this.bufferPool.requestBuffer(minBufferSize);
-	}
-
-	@Override
-	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
-		return this.bufferPool.requestBufferBlocking(minBufferSize);
-	}
-
-	@Override
-	public int getBufferSize() {
-		return this.bufferPool.getBufferSize();
-	}
-
-	@Override
-	public int getNumberOfChannels() {
-		return getNumberOfInputChannels();
-	}
-
-	@Override
-	public void setDesignatedNumberOfBuffers(int numBuffers) {
-		this.bufferPool.setNumDesignatedBuffers(numBuffers);
-	}
-
-	@Override
-	public void clearLocalBufferPool() {
-		this.bufferPool.destroy();
-	}
-
-	@Override
-	public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
-		this.bufferPool = new LocalBufferPool(globalBufferPool, 1);
-	}
-
-	@Override
-	public void logBufferUtilization() {
-		LOG.info(String.format("\t%s: %d available, %d requested, %d designated",
-				this,
-				this.bufferPool.numAvailableBuffers(),
-				this.bufferPool.numRequestedBuffers(),
-				this.bufferPool.numDesignatedBuffers()));
-	}
-
-	@Override
-	public void reportAsynchronousEvent() {
-		this.bufferPool.reportAsynchronousEvent();
-	}
-
-	@Override
-	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
-		return this.bufferPool.registerBufferAvailabilityListener(listener);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/OutputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/OutputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/OutputGate.java
deleted file mode 100644
index f203379..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/OutputGate.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.gates;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.channels.OutputChannel;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-public class OutputGate extends Gate<IOReadableWritable> {
-
-	private OutputChannel[] channels;
-
-	private boolean closed;
-	
-	/**
-	 * Constructs a new output gate.
-	 *
-	 * @param jobId the ID of the job this input gate belongs to
-	 * @param gateId the ID of the gate
-	 * @param index the index assigned to this output gate at the Environment object
-	 */
-	public OutputGate(JobID jobId, GateID gateId, int index) {
-		super(jobId, gateId, index);
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                             Data processing
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public void sendBuffer(Buffer buffer, int targetChannel) throws IOException, InterruptedException {
-		this.channels[targetChannel].sendBuffer(buffer);
-	}
-
-	public void sendEvent(AbstractEvent event, int targetChannel) throws IOException, InterruptedException {
-		this.channels[targetChannel].sendEvent(event);
-	}
-
-	public void sendBufferAndEvent(Buffer buffer, AbstractEvent event, int targetChannel) throws IOException, InterruptedException {
-		this.channels[targetChannel].sendBufferAndEvent(buffer, event);
-	}
-
-	public void broadcastBuffer(Buffer buffer) throws IOException, InterruptedException {
-		for (int i = 1; i < this.channels.length; i++) {
-			channels[i].sendBuffer(buffer.duplicate());
-		}
-		channels[0].sendBuffer(buffer);
-	}
-
-	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
-		for (OutputChannel channel : this.channels) {
-			channel.sendEvent(event);
-		}
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                              Channels
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public void initializeChannels(GateDeploymentDescriptor descriptor) {
-		List<ChannelDeploymentDescriptor> channelDescr = descriptor.getChannels();
-		
-		int numChannels = channelDescr.size();
-		this.channels = new OutputChannel[numChannels];
-
-		for (int i = 0; i < numChannels; i++) {
-			ChannelDeploymentDescriptor channelDescriptor = channelDescr.get(i);
-
-			ChannelID id = channelDescriptor.getOutputChannelID();
-			ChannelID connectedId = channelDescriptor.getInputChannelID();
-
-			this.channels[i] = new OutputChannel(this, i, id, connectedId, getChannelType());
-		}
-	}
-
-	public OutputChannel[] channels() {
-		return this.channels;
-	}
-
-	public OutputChannel getChannel(int index) {
-		return (index < this.channels.length) ? this.channels[index] : null;
-	}
-
-	public int getNumChannels() {
-		return this.channels.length;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                              Shutdown
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public void requestClose() throws IOException, InterruptedException {
-		for (OutputChannel channel : this.channels) {
-			channel.requestClose();
-		}
-	}
-
-	@Override
-	public boolean isClosed() {
-		if (this.closed) {
-			return true;
-		}
-		
-		for (OutputChannel channel : this.channels) {
-			if (!channel.isClosed()) {
-				return false;
-			}
-		}
-		
-		this.closed = true;
-		return true;
-	}
-	
-	public void waitForGateToBeClosed() throws InterruptedException {
-		if (this.closed) {
-			return;
-		}
-		
-		for (OutputChannel channel : this.channels) {
-			channel.waitForChannelToBeClosed();
-		}
-		
-		this.closed = true;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@Override
-	public boolean isInputGate() {
-		return false;
-	}
-
-	@Override
-	public String toString() {
-		return "Output " + super.toString();
-	}
-
-	@Override
-	public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
-		// replaced by broadcastEvent(AbstractEvent) => TODO will be removed with changes to input side
-	}
-
-	@Override
-	public void releaseAllChannelResources() {
-		// nothing to do for buffer oriented runtime => TODO will be removed with changes to input side
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/RecordAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/RecordAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/RecordAvailabilityListener.java
deleted file mode 100644
index 50c9a82..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/RecordAvailabilityListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.gates;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-
-/**
- * This interface can be implemented by a class which shall be notified by an input gate when one of the its connected
- * input channels has at least one record available for reading.
- * 
- * @param <T>
- *        the type of record transported through the corresponding input gate
- */
-public interface RecordAvailabilityListener<T extends IOReadableWritable> {
-
-	/**
-	 * This method is called by an input gate when one of its connected input channels has at least one record available
-	 * for reading.
-	 * 
-	 * @param inputGate
-	 *        the input gate which has at least one record available
-	 */
-	void reportRecordAvailability(InputGate<T> inputGate);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder.java
deleted file mode 100644
index 8a84f1b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.netty;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter implements BufferAvailabilityListener {
-
-	private static final Logger LOG = LoggerFactory.getLogger(InboundEnvelopeDecoder.class);
-
-	private final BufferProviderBroker bufferProviderBroker;
-
-	private final BufferAvailabilityChangedTask bufferAvailabilityChangedTask = new BufferAvailabilityChangedTask();
-
-	private final ConcurrentLinkedQueue<Buffer> bufferBroker = new ConcurrentLinkedQueue<Buffer>();
-
-	private final ByteBuffer headerBuffer;
-
-	private Envelope currentEnvelope;
-
-	private ByteBuffer currentEventsBuffer;
-
-	private ByteBuffer currentDataBuffer;
-
-	private int currentBufferRequestSize;
-
-	private BufferProvider currentBufferProvider;
-
-	private JobID lastJobId;
-
-	private ChannelID lastSourceId;
-
-	private ByteBuf stagedBuffer;
-
-	private ChannelHandlerContext channelHandlerContext;
-
-	private int bytesToSkip;
-
-	private enum DecoderState {
-		COMPLETE,
-		PENDING,
-		NO_BUFFER_AVAILABLE
-	}
-
-	public InboundEnvelopeDecoder(BufferProviderBroker bufferProviderBroker) {
-		this.bufferProviderBroker = bufferProviderBroker;
-		this.headerBuffer = ByteBuffer.allocateDirect(OutboundEnvelopeEncoder.HEADER_SIZE);
-	}
-
-	@Override
-	public void channelActive(ChannelHandlerContext ctx) throws Exception {
-		if (this.channelHandlerContext == null) {
-			this.channelHandlerContext = ctx;
-		}
-
-		super.channelActive(ctx);
-	}
-
-	@Override
-	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-		if (this.stagedBuffer != null) {
-			throw new IllegalStateException("No channel read event should be fired " +
-					"as long as the a buffer is staged.");
-		}
-
-		ByteBuf in = (ByteBuf) msg;
-
-		if (this.bytesToSkip > 0) {
-			this.bytesToSkip = skipBytes(in, this.bytesToSkip);
-
-			// we skipped over the whole buffer
-			if (this.bytesToSkip > 0) {
-				in.release();
-				return;
-			}
-		}
-
-		decodeBuffer(in, ctx);
-	}
-
-	/**
-	 * Decodes all Envelopes contained in a Netty ByteBuf and forwards them in the pipeline.
-	 * Returns true and releases the buffer, if it was fully consumed. Otherwise, returns false and retains the buffer.
-	 * </p>
-	 * In case of no buffer availability (returns false), a buffer availability listener is registered and the input
-	 * buffer is staged for later consumption.
-	 *
-	 * @return <code>true</code>, if buffer fully consumed, <code>false</code> otherwise
-	 * @throws IOException
-	 */
-	private boolean decodeBuffer(ByteBuf in, ChannelHandlerContext ctx) throws IOException {
-
-		DecoderState decoderState;
-		while ((decoderState = decodeEnvelope(in)) != DecoderState.PENDING) {
-			if (decoderState == DecoderState.COMPLETE) {
-				ctx.fireChannelRead(this.currentEnvelope);
-				this.currentEnvelope = null;
-			}
-			else if (decoderState == DecoderState.NO_BUFFER_AVAILABLE) {
-				switch (this.currentBufferProvider.registerBufferAvailabilityListener(this)) {
-					case SUCCEEDED_REGISTERED:
-						if (ctx.channel().config().isAutoRead()) {
-							ctx.channel().config().setAutoRead(false);
-
-							if (LOG.isDebugEnabled()) {
-								LOG.debug(String.format("Set channel %s auto read to false.", ctx.channel()));
-							}
-						}
-
-						this.stagedBuffer = in;
-						this.stagedBuffer.retain();
-						return false;
-
-					case FAILED_BUFFER_AVAILABLE:
-						continue;
-
-					case FAILED_BUFFER_POOL_DESTROYED:
-						this.bytesToSkip = skipBytes(in, this.currentBufferRequestSize);
-
-						this.currentBufferRequestSize = 0;
-						this.currentEventsBuffer = null;
-						this.currentEnvelope = null;
-				}
-			}
-		}
-
-		if (in.isReadable()) {
-			throw new IllegalStateException("Every buffer should have been fully" +
-					"consumed after *successfully* decoding it (if it was not successful, " +
-					"the buffer will be staged for later consumption).");
-		}
-
-		in.release();
-		return true;
-	}
-
-	/**
-	 * Notifies the IO thread that a Buffer has become available again.
-	 * <p/>
-	 * This method will be called from outside the Netty IO thread. The caller will be the buffer pool from which the
-	 * available buffer comes (i.e. the InputGate).
-	 * <p/>
-	 * We have to make sure that the available buffer is handed over to the IO thread in a safe manner.
-	 */
-	@Override
-	public void bufferAvailable(Buffer buffer) throws Exception {
-		this.bufferBroker.offer(buffer);
-		this.channelHandlerContext.channel().eventLoop().execute(this.bufferAvailabilityChangedTask);
-	}
-
-	/**
-	 * Continues the decoding of a staged buffer after a buffer has become available again.
-	 * <p/>
-	 * This task should be executed by the IO thread to ensure safe access to the staged buffer.
-	 */
-	private class BufferAvailabilityChangedTask implements Runnable {
-		@Override
-		public void run() {
-			Buffer availableBuffer = bufferBroker.poll();
-			if (availableBuffer == null) {
-				throw new IllegalStateException("The BufferAvailabilityChangedTask" +
-						"should only be executed when a Buffer has been offered" +
-						"to the Buffer broker (after becoming available).");
-			}
-
-			// This alters the state of the last `decodeEnvelope(ByteBuf)`
-			// call to set the buffer, which has become available again
-			availableBuffer.limitSize(currentBufferRequestSize);
-			currentEnvelope.setBuffer(availableBuffer);
-			currentDataBuffer = availableBuffer.getMemorySegment().wrap(0, InboundEnvelopeDecoder.this.currentBufferRequestSize);
-			currentBufferRequestSize = 0;
-
-			stagedBuffer.release();
-
-			try {
-				if (decodeBuffer(stagedBuffer, channelHandlerContext)) {
-					stagedBuffer = null;
-					channelHandlerContext.channel().config().setAutoRead(true);
-					if (LOG.isDebugEnabled()) {
-						LOG.debug(String.format("Set channel %s auto read to true.", channelHandlerContext.channel()));
-					}
-				}
-			} catch (IOException e) {
-				availableBuffer.recycleBuffer();
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------
-
-	private DecoderState decodeEnvelope(ByteBuf in) throws IOException {
-		// --------------------------------------------------------------------
-		// (1) header (EnvelopeEncoder.HEADER_SIZE bytes)
-		// --------------------------------------------------------------------
-		if (this.currentEnvelope == null) {
-			copy(in, this.headerBuffer);
-
-			if (this.headerBuffer.hasRemaining()) {
-				return DecoderState.PENDING;
-			}
-			else {
-				this.headerBuffer.flip();
-
-				int magicNum = this.headerBuffer.getInt();
-				if (magicNum != OutboundEnvelopeEncoder.MAGIC_NUMBER) {
-					throw new IOException("Network stream corrupted: invalid magic" +
-							"number in current envelope header.");
-				}
-
-				int seqNum = this.headerBuffer.getInt();
-				JobID jobId = JobID.fromByteBuffer(this.headerBuffer);
-				ChannelID sourceId = ChannelID.fromByteBuffer(this.headerBuffer);
-
-				this.currentEnvelope = new Envelope(seqNum, jobId, sourceId);
-
-				int eventsSize = this.headerBuffer.getInt();
-				int bufferSize = this.headerBuffer.getInt();
-
-				this.currentEventsBuffer = eventsSize > 0 ? ByteBuffer.allocate(eventsSize) : null;
-				this.currentBufferRequestSize = bufferSize > 0 ? bufferSize : 0;
-
-				this.headerBuffer.clear();
-			}
-		}
-
-		// --------------------------------------------------------------------
-		// (2) events (var length)
-		// --------------------------------------------------------------------
-		if (this.currentEventsBuffer != null) {
-			copy(in, this.currentEventsBuffer);
-
-			if (this.currentEventsBuffer.hasRemaining()) {
-				return DecoderState.PENDING;
-			}
-			else {
-				this.currentEventsBuffer.flip();
-				this.currentEnvelope.setEventsSerialized(this.currentEventsBuffer);
-				this.currentEventsBuffer = null;
-			}
-		}
-
-		// --------------------------------------------------------------------
-		// (3) buffer (var length)
-		// --------------------------------------------------------------------
-		// (a) request a buffer from OUR pool
-		if (this.currentBufferRequestSize > 0) {
-			JobID jobId = this.currentEnvelope.getJobID();
-			ChannelID sourceId = this.currentEnvelope.getSource();
-			Buffer buffer = requestBufferForTarget(jobId, sourceId, this.currentBufferRequestSize);
-
-			if (buffer == null) {
-				return DecoderState.NO_BUFFER_AVAILABLE;
-			}
-			else {
-				this.currentEnvelope.setBuffer(buffer);
-				this.currentDataBuffer = buffer.getMemorySegment().wrap(0, this.currentBufferRequestSize);
-				this.currentBufferRequestSize = 0;
-			}
-		}
-
-		// (b) copy data to OUR buffer
-		if (this.currentDataBuffer != null) {
-			copy(in, this.currentDataBuffer);
-
-			if (this.currentDataBuffer.hasRemaining()) {
-				return DecoderState.PENDING;
-			}
-			else {
-				this.currentDataBuffer = null;
-			}
-		}
-
-		// if we made it to this point, we completed the envelope;
-		// in the other cases we return early with PENDING or NO_BUFFER_AVAILABLE
-		return DecoderState.COMPLETE;
-	}
-
-	private Buffer requestBufferForTarget(JobID jobId, ChannelID sourceId, int size) throws IOException {
-		// Request the buffer from the target buffer provider, which is the
-		// InputGate of the receiving InputChannel.
-		if (!(jobId.equals(this.lastJobId) && sourceId.equals(this.lastSourceId))) {
-			this.lastJobId = jobId;
-			this.lastSourceId = sourceId;
-
-			this.currentBufferProvider = this.bufferProviderBroker.getBufferProvider(jobId, sourceId);
-		}
-
-		return this.currentBufferProvider.requestBuffer(size);
-	}
-
-	/**
-	 * Copies min(from.readableBytes(), to.remaining() bytes from Nettys ByteBuf to the Java NIO ByteBuffer.
-	 */
-	private void copy(ByteBuf src, ByteBuffer dst) {
-		// This branch is necessary, because an Exception is thrown if the
-		// destination buffer has more remaining (writable) bytes than
-		// currently readable from the Netty ByteBuf source.
-		if (src.isReadable()) {
-			if (src.readableBytes() < dst.remaining()) {
-				int oldLimit = dst.limit();
-
-				dst.limit(dst.position() + src.readableBytes());
-				src.readBytes(dst);
-				dst.limit(oldLimit);
-			}
-			else {
-				src.readBytes(dst);
-			}
-		}
-	}
-
-	/**
-	 * Skips over min(in.readableBytes(), toSkip) bytes in the Netty ByteBuf and returns how many bytes remain to be
-	 * skipped.
-	 *
-	 * @return remaining bytes to be skipped
-	 */
-	private int skipBytes(ByteBuf in, int toSkip) {
-		if (toSkip <= in.readableBytes()) {
-			in.readBytes(toSkip);
-			return 0;
-		}
-
-		int remainingToSkip = toSkip - in.readableBytes();
-		in.readerIndex(in.readerIndex() + in.readableBytes());
-
-		return remainingToSkip;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDispatcher.java
deleted file mode 100644
index a7167be..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDispatcher.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.netty;
-
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.EnvelopeDispatcher;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-
-public class InboundEnvelopeDispatcher extends ChannelInboundHandlerAdapter {
-
-	private final EnvelopeDispatcher envelopeDispatcher;
-
-	public InboundEnvelopeDispatcher(EnvelopeDispatcher envelopeDispatcher) {
-		this.envelopeDispatcher = envelopeDispatcher;
-	}
-
-	@Override
-	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-		Envelope envelope = (Envelope) msg;
-
-		envelopeDispatcher.dispatchFromNetwork(envelope);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
new file mode 100644
index 0000000..35bccf4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import static com.google.common.base.Preconditions.checkState;
+
+class NettyClient {
+
+	private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);
+
+	private final NettyConfig config;
+
+	private Bootstrap bootstrap;
+
+	NettyClient(NettyConfig config) {
+		this.config = config;
+	}
+
+	void init(final NettyProtocol protocol) throws IOException {
+		checkState(bootstrap == null, "Netty client has already been initialized.");
+
+		long start = System.currentTimeMillis();
+
+		bootstrap = new Bootstrap();
+
+		// --------------------------------------------------------------------
+		// Transport-specific configuration
+		// --------------------------------------------------------------------
+
+		switch (config.getTransportType()) {
+			case NIO:
+				initNioBootstrap();
+				break;
+
+			case EPOLL:
+				initEpollBootstrap();
+				break;
+
+			case AUTO:
+				if (Epoll.isAvailable()) {
+					initEpollBootstrap();
+					LOG.info("Transport type 'auto': using EPOLL.");
+				}
+				else {
+					initNioBootstrap();
+					LOG.info("Transport type 'auto': using NIO.");
+				}
+		}
+
+		// --------------------------------------------------------------------
+		// Configuration
+		// --------------------------------------------------------------------
+
+		bootstrap.option(ChannelOption.TCP_NODELAY, true);
+		bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+
+		// Timeout for new connections
+		bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutMs() * 1000);
+
+		// Pooled allocator for Netty's ByteBuf instances
+		bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+
+		// Low and high water marks for flow control
+		bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() / 2);
+		bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, config.getMemorySegmentSize());
+
+		// Receive and send buffer size
+		int receiveAndSendBufferSize = config.getSendAndReceiveBufferSize();
+		if (receiveAndSendBufferSize > 0) {
+			bootstrap.option(ChannelOption.SO_SNDBUF, receiveAndSendBufferSize);
+			bootstrap.option(ChannelOption.SO_RCVBUF, receiveAndSendBufferSize);
+		}
+
+		// --------------------------------------------------------------------
+		// Child channel pipeline for accepted connections
+		// --------------------------------------------------------------------
+
+		bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+			@Override
+			public void initChannel(SocketChannel channel) throws Exception {
+				protocol.setClientChannelPipeline(channel.pipeline());
+			}
+		});
+
+		long end = System.currentTimeMillis();
+		LOG.info("Successful initialization (took {} ms).", (end - start));
+	}
+
+	void shutdown() {
+		long start = System.currentTimeMillis();
+
+		if (bootstrap != null) {
+			if (bootstrap.group() != null) {
+				bootstrap.group().shutdownGracefully();
+			}
+			bootstrap = null;
+		}
+
+		long end = System.currentTimeMillis();
+		LOG.info("Successful shutdown (took {} ms).", (end - start));
+	}
+
+	private void initNioBootstrap() {
+		// Add the server port number to the name in order to distinguish
+		// multiple clients running on the same host.
+		String name = NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";
+
+		NioEventLoopGroup nioGroup = new NioEventLoopGroup(config.getClientNumThreads(), NettyServer.getNamedThreadFactory(name));
+		bootstrap.group(nioGroup).channel(NioSocketChannel.class);
+	}
+
+	private void initEpollBootstrap() {
+		// Add the server port number to the name in order to distinguish
+		// multiple clients running on the same host.
+		String name = NettyConfig.CLIENT_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";
+
+		EpollEventLoopGroup epollGroup = new EpollEventLoopGroup(config.getServerNumThreads(), NettyServer.getNamedThreadFactory(name));
+		bootstrap.group(epollGroup).channel(EpollSocketChannel.class);
+	}
+
+	// ------------------------------------------------------------------------
+	// Client connections
+	// ------------------------------------------------------------------------
+
+	ChannelFuture connect(SocketAddress serverSocketAddress) {
+		checkState(bootstrap != null, "Client has not been initialized yet.");
+
+		return bootstrap.connect(serverSocketAddress);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
new file mode 100644
index 0000000..a3e01fe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class NettyConfig {
+
+	private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);
+
+	public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads";
+
+	public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads";
+
+	static enum TransportType {
+		NIO, EPOLL, AUTO
+	}
+
+	final static String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";
+
+	final static String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";
+
+	private final InetAddress serverAddress;
+
+	private final int serverPort;
+
+	private final int memorySegmentSize;
+
+	private final Configuration config; // optional configuration
+
+	public NettyConfig(InetAddress serverAddress, int serverPort, int memorySegmentSize, Configuration config) {
+		this.serverAddress = checkNotNull(serverAddress);
+
+		checkArgument(serverPort > 0 && serverPort <= 65536, "Invalid port number.");
+		this.serverPort = serverPort;
+
+		checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
+		this.memorySegmentSize = memorySegmentSize;
+
+		this.config = checkNotNull(config);
+
+		LOG.info(this.toString());
+	}
+
+	InetAddress getServerAddress() {
+		return serverAddress;
+	}
+
+	int getServerPort() {
+		return serverPort;
+	}
+
+	int getMemorySegmentSize() {
+		return memorySegmentSize;
+	}
+
+	// ------------------------------------------------------------------------
+
+	int getServerConnectBacklog() {
+		// default: 0 => Netty's default
+		return config.getInteger("taskmanager.net.server.backlog", 0);
+	}
+
+	int getServerNumThreads() {
+		// default: 0 => Netty's default: 2 * #cores
+		return config.getInteger(NUM_THREADS_SERVER, 0);
+	}
+
+	int getClientNumThreads() {
+		// default: 0 => Netty's default: 2 * #cores
+		return config.getInteger(NUM_THREADS_CLIENT, 0);
+	}
+
+	int getClientConnectTimeoutMs() {
+		// default: 120s = 2min
+		return config.getInteger("taskmanager.net.client.connectTimeoutSec", 120);
+	}
+
+	int getSendAndReceiveBufferSize() {
+		// default: 0 => Netty's default
+		return config.getInteger("taskmanager.net.sendReceiveBufferSize", 0);
+	}
+
+	TransportType getTransportType() {
+		String transport = config.getString("taskmanager.net.transport", "nio");
+
+		if (transport.equals("nio")) {
+			return TransportType.NIO;
+		}
+		else if (transport.equals("epoll")) {
+			return TransportType.EPOLL;
+		}
+		else {
+			return TransportType.AUTO;
+		}
+	}
+
+	@Override
+	public String toString() {
+		String format = "NettyConfig [" +
+				"server address: %s, " +
+				"server port: %d, " +
+				"memory segment size (bytes): %d, " +
+				"transport type: %s, " +
+				"number of server threads: %d (%s), " +
+				"number of client threads: %d (%s), " +
+				"server connect backlog: %d (%s), " +
+				"client connect timeout (sec): %d, " +
+				"send/receive buffer size (bytes): %d (%s)]";
+
+		String def = "use Netty's default";
+		String man = "manual";
+
+		return String.format(format, serverAddress, serverPort, memorySegmentSize,
+				getTransportType(), getServerNumThreads(), getServerNumThreads() == 0 ? def : man,
+				getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
+				getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
+				getClientConnectTimeoutMs(), getSendAndReceiveBufferSize(),
+				getSendAndReceiveBufferSize() == 0 ? def : man);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 941c448..bbb303b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -18,392 +18,50 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.FixedRecvByteBufAllocator;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.io.network.ChannelManager;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.EnvelopeDispatcher;
-import org.apache.flink.runtime.io.network.NetworkConnectionManager;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.RemoteAddress;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Date;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
-public class NettyConnectionManager implements NetworkConnectionManager {
+public class NettyConnectionManager implements ConnectionManager {
 
-	private static final Logger LOG = LoggerFactory.getLogger(NettyConnectionManager.class);
+	private final NettyServer server;
 
-	private static final int DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS = 10000;
+	private final NettyClient client;
 
-	private final ConcurrentMap<RemoteReceiver, Object> outConnections = new ConcurrentHashMap<RemoteReceiver, Object>();
+	private final PartitionRequestClientFactory partitionRequestClientFactory;
 
-	private final InetAddress bindAddress;
+	public NettyConnectionManager(NettyConfig nettyConfig) {
+		this.server = new NettyServer(nettyConfig);
+		this.client = new NettyClient(nettyConfig);
 
-	private final int bindPort;
-
-	private final int bufferSize;
-
-	private final int numInThreads;
-
-	private final int numOutThreads;
-
-	private final int lowWaterMark;
-
-	private final int highWaterMark;
-
-	private ServerBootstrap in;
-
-	private Bootstrap out;
-
-	private Thread debugThread;
-
-	public NettyConnectionManager(InetAddress bindAddress, int bindPort, int bufferSize, int numInThreads,
-								int numOutThreads, int lowWaterMark, int highWaterMark) {
-
-		this.bindAddress = bindAddress;
-		this.bindPort = bindPort;
-
-		this.bufferSize = bufferSize;
-
-		int defaultNumThreads = Math.max(Runtime.getRuntime().availableProcessors() / 4, 1);
-
-		this.numInThreads = (numInThreads == -1) ? defaultNumThreads : numInThreads;
-		this.numOutThreads = (numOutThreads == -1) ? defaultNumThreads : numOutThreads;
-
-		this.lowWaterMark = (lowWaterMark == -1) ? bufferSize / 2 : lowWaterMark;
-		this.highWaterMark = (highWaterMark == -1) ? bufferSize : highWaterMark;
+		this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
 	}
 
 	@Override
-	public void start(ChannelManager channelManager) throws IOException {
-		LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", numInThreads, numOutThreads));
-		LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.", lowWaterMark, highWaterMark));
-
-		final BufferProviderBroker bufferProviderBroker = channelManager;
-		final EnvelopeDispatcher envelopeDispatcher = channelManager;
-
-		int numHeapArenas = 0;
-		int numDirectArenas = numInThreads + numOutThreads;
-		int pageSize = bufferSize << 1;
-		int chunkSize = 16 * 1 << 20; // 16 MB
-
-		// shift pageSize maxOrder times to get to chunkSize
-		int maxOrder = (int) (Math.log(chunkSize/pageSize) / Math.log(2));
+	public void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
+		PartitionRequestProtocol partitionRequestProtocol = new PartitionRequestProtocol(partitionProvider, taskEventDispatcher);
 
-		PooledByteBufAllocator pooledByteBufAllocator =
-				new PooledByteBufAllocator(true, numHeapArenas, numDirectArenas, pageSize, maxOrder);
-
-		String msg = String.format("Instantiated PooledByteBufAllocator with direct arenas: %d, heap arenas: %d, " +
-				"page size (bytes): %d, chunk size (bytes): %d.",
-				numDirectArenas, numHeapArenas, pageSize, (pageSize << maxOrder));
-		LOG.info(msg);
-
-		// --------------------------------------------------------------------
-		// server bootstrap (incoming connections)
-		// --------------------------------------------------------------------
-		in = new ServerBootstrap();
-		in.group(new NioEventLoopGroup(numInThreads))
-				.channel(NioServerSocketChannel.class)
-				.localAddress(bindAddress, bindPort)
-				.childHandler(new ChannelInitializer<SocketChannel>() {
-					@Override
-					public void initChannel(SocketChannel channel) throws Exception {
-						channel.pipeline()
-								.addLast(new InboundEnvelopeDecoder(bufferProviderBroker))
-								.addLast(new InboundEnvelopeDispatcher(envelopeDispatcher));
-					}
-				})
-				.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(pageSize))
-				.option(ChannelOption.ALLOCATOR, pooledByteBufAllocator);
-
-		// --------------------------------------------------------------------
-		// client bootstrap (outgoing connections)
-		// --------------------------------------------------------------------
-		out = new Bootstrap();
-		out.group(new NioEventLoopGroup(numOutThreads))
-				.channel(NioSocketChannel.class)
-				.handler(new ChannelInitializer<SocketChannel>() {
-					@Override
-					public void initChannel(SocketChannel channel) throws Exception {
-						channel.pipeline()
-								.addLast(new OutboundEnvelopeEncoder());
-					}
-				})
-				.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, lowWaterMark)
-				.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, highWaterMark)
-				.option(ChannelOption.ALLOCATOR, pooledByteBufAllocator)
-				.option(ChannelOption.TCP_NODELAY, false)
-				.option(ChannelOption.SO_KEEPALIVE, true);
-
-		try {
-			in.bind().sync();
-		} catch (InterruptedException e) {
-			throw new IOException(e);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			debugThread = new Thread(new Runnable() {
-				@Override
-				public void run() {
-					Date date = new Date();
-
-					while (true) {
-						if (Thread.interrupted()) {
-							break;
-						}
-
-						try {
-							Thread.sleep(DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS);
-
-							date.setTime(System.currentTimeMillis());
-
-							System.out.println(date);
-							System.out.println(getNonZeroNumQueuedEnvelopes());
-						} catch (InterruptedException e) {
-							break;
-						}
-					}
-				}
-			});
-			debugThread.start();
-		}
+		client.init(partitionRequestProtocol);
+		server.init(partitionRequestProtocol);
 	}
 
 	@Override
-	public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean isFirstEnvelope) throws IOException {
-		// Get the channel. The channel may be
-		// 1) a channel that already exists (usual case) -> just send the data
-		// 2) a channel that is in buildup (sometimes) -> attach to the future and wait for the actual channel
-		// 3) not yet existing -> establish the channel
-
-		final Object entry = this.outConnections.get(receiver);
-		final OutboundConnectionQueue channel;
-
-		if (entry != null) {
-			// existing channel or channel in buildup
-			if (entry instanceof OutboundConnectionQueue) {
-				channel = (OutboundConnectionQueue) entry;
-			}
-			else {
-				ChannelInBuildup future = (ChannelInBuildup) entry;
-				channel = future.waitForChannel();
-			}
-		}
-		else {
-			// No channel yet. Create one, but watch out for a race.
-			// We create a "buildup future" and atomically add it to the map.
-			// Only the thread that really added it establishes the channel.
-			// The others need to wait on that original establisher's future.
-			ChannelInBuildup inBuildup = new ChannelInBuildup(this.out, receiver);
-			Object old = this.outConnections.putIfAbsent(receiver, inBuildup);
-
-			if (old == null) {
-				this.out.connect(receiver.getConnectionAddress()).addListener(inBuildup);
-				channel = inBuildup.waitForChannel();
-
-				Object previous = this.outConnections.put(receiver, channel);
-
-				if (inBuildup != previous) {
-					throw new IOException("Race condition during channel build up.");
-				}
-			}
-			else if (old instanceof ChannelInBuildup) {
-				channel = ((ChannelInBuildup) old).waitForChannel();
-			}
-			else {
-				channel = (OutboundConnectionQueue) old;
-			}
-		}
-
-		// The first envelope of a logical channel increments the reference counter of the
-		// connection to indicate that it is holding the resource. When unregistering the task,
-		// every logical channel decrements this counter again and the last one to decrement it
-		// to zero releases the connection.
-		if (isFirstEnvelope) {
-			if (channel.incrementReferenceCounter()) {
-				channel.enqueue(envelope);
-			}
-			else {
-				// There was a race with a close, try again.
-				outConnections.remove(receiver, channel);
-
-				enqueue(envelope, receiver, isFirstEnvelope);
-			}
-		}
-		else {
-			channel.enqueue(envelope);
-		}
-	}
-
-	@Override
-	public void close(RemoteReceiver receiver) {
-		Object entry = outConnections.get(receiver);
-
-		if (entry instanceof OutboundConnectionQueue) {
-			OutboundConnectionQueue channel = (OutboundConnectionQueue) entry;
-
-			// It is possible that we decrement without ever having incremented the counter, which
-			// is fine.
-			try {
-				if (channel.decrementReferenceCounter()) {
-					channel.close();
-					outConnections.remove(receiver, channel);
-				}
-			} catch (Exception ignored) {
-			}
-		}
+	public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
+		return partitionRequestClientFactory.createPartitionRequestClient(remoteAddress);
 	}
 
 	@Override
 	public int getNumberOfActiveConnections() {
-		return outConnections.size();
+		return partitionRequestClientFactory.getNumberOfActiveClients();
 	}
 
 	@Override
-	public void shutdown() throws IOException {
-		if (debugThread != null) {
-			debugThread.interrupt();
-		}
-
-		if (!in.group().isShuttingDown()) {
-			LOG.info("Shutting down incoming connections.");
-
-			try {
-				in.group().shutdownGracefully().sync();
-			} catch (InterruptedException e) {
-				throw new IOException(e);
-			}
-		}
-
-		if (!out.group().isShuttingDown()) {
-			LOG.info("Shutting down outgoing connections.");
-
-			try {
-				out.group().shutdownGracefully().sync();
-			} catch (InterruptedException e) {
-				throw new IOException(e);
-			}
-		}
-	}
-
-	private String getNonZeroNumQueuedEnvelopes() {
-		StringBuilder str = new StringBuilder();
-
-		str.append(String.format("==== %d outgoing connections ===\n", this.outConnections.size()));
-
-		for (Map.Entry<RemoteReceiver, Object> entry : this.outConnections.entrySet()) {
-			RemoteReceiver receiver = entry.getKey();
-
-			Object value = entry.getValue();
-			if (value instanceof OutboundConnectionQueue) {
-				OutboundConnectionQueue queue = (OutboundConnectionQueue) value;
-				if (queue.getNumQueuedEnvelopes() > 0) {
-					str.append(String.format("%s> Number of queued envelopes for %s with channel %s: %d\n",
-							Thread.currentThread().getId(), receiver, queue.toString(), queue.getNumQueuedEnvelopes()));
-				}
-			}
-			else if (value instanceof ChannelInBuildup) {
-				str.append(String.format("%s> Connection to %s is still in buildup\n",
-						Thread.currentThread().getId(), receiver));
-			}
-		}
-
-		return str.toString();
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static final class ChannelInBuildup implements ChannelFutureListener {
-
-		private final Object lock = new Object();
-
-		private volatile OutboundConnectionQueue channel;
-
-		private volatile Throwable error;
-
-		private int numRetries = 3;
-
-		private final Bootstrap out;
-
-		private final RemoteReceiver receiver;
-
-		private ChannelInBuildup(Bootstrap out, RemoteReceiver receiver) {
-			this.out = out;
-			this.receiver = receiver;
-		}
-
-		private void handInChannel(OutboundConnectionQueue c) {
-			synchronized (this.lock) {
-				this.channel = c;
-				this.lock.notifyAll();
-			}
-		}
-
-		private void notifyOfError(Throwable error) {
-			synchronized (this.lock) {
-				this.error = error;
-				this.lock.notifyAll();
-			}
-		}
-
-		private OutboundConnectionQueue waitForChannel() throws IOException {
-			synchronized (this.lock) {
-				while (this.error == null && this.channel == null) {
-					try {
-						this.lock.wait(2000);
-					} catch (InterruptedException e) {
-						throw new RuntimeException("Channel buildup interrupted.");
-					}
-				}
-			}
-
-			if (this.error != null) {
-				throw new IOException("Connecting the channel failed: " + error.getMessage(), error);
-			}
-
-			return this.channel;
-		}
-
-		@Override
-		public void operationComplete(ChannelFuture future) throws Exception {
-			if (future.isSuccess()) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug(String.format("Channel %s connected", future.channel()));
-				}
-
-				handInChannel(new OutboundConnectionQueue(future.channel()));
-			}
-			else if (this.numRetries > 0) {
-				LOG.debug("Connection request did not succeed, retrying ({} attempts left)", numRetries);
-
-				this.out.connect(this.receiver.getConnectionAddress()).addListener(this);
-				this.numRetries--;
-			}
-			else {
-				if (future.getClass() != null) {
-					notifyOfError(future.cause());
-				}
-				else {
-					notifyOfError(new Exception("Connection could not be established."));
-				}
-			}
-		}
+	public void shutdown() {
+		client.shutdown();
+		server.shutdown();
 	}
 }
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
new file mode 100644
index 0000000..39a03ac
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.StringUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * A simple and generic interface to serialize messages to Netty's buffer space.
+ */
+abstract class NettyMessage {
+
+	// ------------------------------------------------------------------------
+	// Note: Every NettyMessage subtype needs to have a public 0-argument
+	// constructor in order to work with the generic deserializer.
+	// ------------------------------------------------------------------------
+
+	private static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic number (4), msg ID (1)
+
+	private static final int MAGIC_NUMBER = 0xBADC0FFE;
+
+	abstract ByteBuf write(ByteBufAllocator allocator) throws Exception;
+
+	abstract void readFrom(ByteBuf buffer) throws Exception;
+
+	// ------------------------------------------------------------------------
+
+	private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) {
+		return allocateBuffer(allocator, id, 0);
+	}
+
+	private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) {
+		final ByteBuf buffer = length != 0 ? allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer();
+		buffer.writeInt(HEADER_LENGTH + length);
+		buffer.writeInt(MAGIC_NUMBER);
+		buffer.writeByte(id);
+
+		return buffer;
+	}
+
+	// ------------------------------------------------------------------------
+	// Generic NettyMessage encoder and decoder
+	// ------------------------------------------------------------------------
+
+	@ChannelHandler.Sharable
+	static class NettyMessageEncoder extends ChannelOutboundHandlerAdapter {
+
+		@Override
+		public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+			if (msg instanceof NettyMessage) {
+				try {
+					ctx.write(((NettyMessage) msg).write(ctx.alloc()), promise);
+				}
+				catch (Throwable t) {
+					throw new IOException("Error while serializing message: " + msg, t);
+				}
+			}
+			else {
+				ctx.write(msg, promise);
+			}
+		}
+
+		// Create the frame length decoder here as it depends on the encoder
+		//
+		// +------------------+------------------+--------++----------------+
+		// | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) || CUSTOM MESSAGE |
+		// +------------------+------------------+--------++----------------+
+		static LengthFieldBasedFrameDecoder createFrameLengthDecoder() {
+			return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, -4, 4);
+		}
+	}
+
+	@ChannelHandler.Sharable
+	static class NettyMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
+
+		@Override
+		protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
+			int magicNumber = msg.readInt();
+
+			if (magicNumber != MAGIC_NUMBER) {
+				throw new IllegalStateException("Network stream corrupted: received incorrect magic number.");
+			}
+
+			byte msgId = msg.readByte();
+
+			NettyMessage decodedMsg = null;
+
+			if (msgId == BufferResponse.ID) {
+				decodedMsg = new BufferResponse();
+			}
+			else if (msgId == PartitionRequest.ID) {
+				decodedMsg = new PartitionRequest();
+			}
+			else if (msgId == TaskEventRequest.ID) {
+				decodedMsg = new TaskEventRequest();
+			}
+			else if (msgId == ErrorResponse.ID) {
+				decodedMsg = new ErrorResponse();
+			}
+			else {
+				throw new IllegalStateException("Received unknown message from producer: " + decodedMsg.getClass());
+			}
+
+			if (decodedMsg != null) {
+				decodedMsg.readFrom(msg);
+				out.add(decodedMsg);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Server responses
+	// ------------------------------------------------------------------------
+
+	static class BufferResponse extends NettyMessage {
+
+		private static final byte ID = 0;
+
+		final Buffer buffer;
+
+		InputChannelID receiverId;
+
+		int sequenceNumber;
+
+		// ---- Deserialization -----------------------------------------------
+
+		boolean isBuffer;
+
+		int size;
+
+		ByteBuf retainedSlice;
+
+		public BufferResponse() {
+			// When deserializing we first have to request a buffer from the respective buffer
+			// provider (at the handler) and copy the buffer from Netty's space to ours.
+			buffer = null;
+		}
+
+		BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId) {
+			this.buffer = buffer;
+			this.sequenceNumber = sequenceNumber;
+			this.receiverId = receiverId;
+		}
+
+		boolean isBuffer() {
+			return isBuffer;
+		}
+
+		int getSize() {
+			return size;
+		}
+
+		ByteBuf getNettyBuffer() {
+			return retainedSlice;
+		}
+
+		void releaseBuffer() {
+			if (retainedSlice != null) {
+				retainedSlice.release();
+				retainedSlice = null;
+			}
+		}
+
+		// --------------------------------------------------------------------
+		// Serialization
+		// --------------------------------------------------------------------
+
+		@Override
+		ByteBuf write(ByteBufAllocator allocator) throws IOException {
+			int length = 16 + 4 + 1 + 4 + buffer.getSize();
+
+			ByteBuf result = null;
+			try {
+				result = allocateBuffer(allocator, ID, length);
+
+				receiverId.writeTo(result);
+				result.writeInt(sequenceNumber);
+				result.writeBoolean(buffer.isBuffer());
+				result.writeInt(buffer.getSize());
+				result.writeBytes(buffer.getNioBuffer());
+
+				return result;
+			}
+			catch (Throwable t) {
+				if (result != null) {
+					result.release();
+				}
+
+				throw new IOException(t);
+			}
+			finally {
+				if (buffer != null) {
+					buffer.recycle();
+				}
+			}
+		}
+
+		@Override
+		void readFrom(ByteBuf buffer) {
+			receiverId = InputChannelID.fromByteBuf(buffer);
+			sequenceNumber = buffer.readInt();
+			isBuffer = buffer.readBoolean();
+			size = buffer.readInt();
+
+			retainedSlice = buffer.readSlice(size);
+			retainedSlice.retain();
+		}
+	}
+
+	static class ErrorResponse extends NettyMessage {
+
+		private static final byte ID = 1;
+
+		Throwable error;
+
+		InputChannelID receiverId;
+
+		public ErrorResponse() {
+		}
+
+		ErrorResponse(Throwable error) {
+			this.error = error;
+		}
+
+		ErrorResponse(Throwable error, InputChannelID receiverId) {
+			this.error = error;
+			this.receiverId = receiverId;
+		}
+
+		boolean isFatalError() {
+			return receiverId == null;
+		}
+
+		@Override
+		ByteBuf write(ByteBufAllocator allocator) throws IOException {
+			ByteBuf result = null;
+
+			try {
+				result = allocateBuffer(allocator, ID);
+
+				DataOutputView outputView = new ByteBufDataOutputView(result);
+
+				StringUtils.writeNullableString(error.getClass().getName(), outputView);
+				StringUtils.writeNullableString(error.getMessage(), outputView);
+
+				if (receiverId != null) {
+					result.writeBoolean(true);
+					receiverId.writeTo(result);
+				}
+				else {
+					result.writeBoolean(false);
+				}
+
+				// Update frame length...
+				result.setInt(0, result.readableBytes());
+
+				return result;
+			}
+			catch (Throwable t) {
+				if (result != null) {
+					result.release();
+				}
+
+				throw new IOException(t);
+			}
+		}
+
+		@Override
+		void readFrom(ByteBuf buffer) throws Exception {
+			DataInputView inputView = new ByteBufDataInputView(buffer);
+
+			String errorClassName = StringUtils.readNullableString(inputView);
+			Class<Throwable> errorClazz = (Class<Throwable>) getClass().getClassLoader().loadClass(errorClassName);
+
+			String errorMsg = StringUtils.readNullableString(inputView);
+			if (errorMsg != null) {
+				error = errorClazz.getConstructor(String.class).newInstance(errorMsg);
+			}
+			else {
+				error = InstantiationUtil.instantiate(errorClazz);
+			}
+
+			if (buffer.readBoolean()) {
+				receiverId = InputChannelID.fromByteBuf(buffer);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Client requests
+	// ------------------------------------------------------------------------
+
+	static class PartitionRequest extends NettyMessage {
+
+		final static byte ID = 2;
+
+		ExecutionAttemptID producerExecutionId;
+
+		IntermediateResultPartitionID partitionId;
+
+		int queueIndex;
+
+		InputChannelID receiverId;
+
+		public PartitionRequest() {
+		}
+
+		PartitionRequest(ExecutionAttemptID producerExecutionId, IntermediateResultPartitionID partitionId, int queueIndex, InputChannelID receiverId) {
+			this.producerExecutionId = producerExecutionId;
+			this.partitionId = partitionId;
+			this.queueIndex = queueIndex;
+			this.receiverId = receiverId;
+		}
+
+		@Override
+		ByteBuf write(ByteBufAllocator allocator) throws IOException {
+			ByteBuf result = null;
+
+			try {
+				result = allocateBuffer(allocator, ID, 16 + 16 + 4 + 16);
+
+				producerExecutionId.writeTo(result);
+				partitionId.writeTo(result);
+				result.writeInt(queueIndex);
+				receiverId.writeTo(result);
+
+				return result;
+			}
+			catch (Throwable t) {
+				if (result != null) {
+					result.release();
+				}
+
+				throw new IOException(t);
+			}
+		}
+
+		@Override
+		public void readFrom(ByteBuf buffer) {
+			producerExecutionId = ExecutionAttemptID.fromByteBuf(buffer);
+			partitionId = IntermediateResultPartitionID.fromByteBuf(buffer);
+			queueIndex = buffer.readInt();
+			receiverId = InputChannelID.fromByteBuf(buffer);
+		}
+	}
+
+	static class TaskEventRequest extends NettyMessage {
+
+		final static byte ID = 3;
+
+		TaskEvent event;
+
+		InputChannelID receiverId;
+
+		ExecutionAttemptID executionId;
+
+		IntermediateResultPartitionID partitionId;
+
+		public TaskEventRequest() {
+		}
+
+		TaskEventRequest(TaskEvent event, ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, InputChannelID receiverId) {
+			this.event = event;
+			this.executionId = executionId;
+			this.receiverId = receiverId;
+			this.partitionId = partitionId;
+		}
+
+		@Override
+		ByteBuf write(ByteBufAllocator allocator) throws IOException {
+			ByteBuf result = null;
+
+			try {
+				// TODO Directly serialize to Netty's buffer
+				ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
+
+				result = allocateBuffer(allocator, ID, 4 + serializedEvent.remaining() + 16 + 16 + 16);
+
+				result.writeInt(serializedEvent.remaining());
+				result.writeBytes(serializedEvent);
+				executionId.writeTo(result);
+				receiverId.writeTo(result);
+				partitionId.writeTo(result);
+
+				return result;
+			}
+			catch (Throwable t) {
+				if (result != null) {
+					result.release();
+				}
+
+				throw new IOException(t);
+			}
+		}
+
+		@Override
+		public void readFrom(ByteBuf buffer) {
+			// TODO Directly deserialize fromNetty's buffer
+			int length = buffer.readInt();
+			ByteBuffer serializedEvent = ByteBuffer.allocate(length);
+
+			buffer.readBytes(serializedEvent);
+			serializedEvent.flip();
+
+			event = (TaskEvent) EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
+
+			executionId = ExecutionAttemptID.fromByteBuf(buffer);
+			receiverId = InputChannelID.fromByteBuf(buffer);
+			partitionId = IntermediateResultPartitionID.fromByteBuf(buffer);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class ByteBufDataInputView implements DataInputView {
+
+		private final ByteBufInputStream inputView;
+
+		public ByteBufDataInputView(ByteBuf buffer) {
+			this.inputView = new ByteBufInputStream(buffer);
+		}
+
+		@Override
+		public void skipBytesToRead(int numBytes) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			return inputView.read(b, off, len);
+		}
+
+		@Override
+		public int read(byte[] b) throws IOException {
+			return inputView.read(b);
+		}
+
+		@Override
+		public void readFully(byte[] b) throws IOException {
+			inputView.readFully(b);
+		}
+
+		@Override
+		public void readFully(byte[] b, int off, int len) throws IOException {
+			inputView.readFully(b, off, len);
+		}
+
+		@Override
+		public int skipBytes(int n) throws IOException {
+			return inputView.skipBytes(n);
+		}
+
+		@Override
+		public boolean readBoolean() throws IOException {
+			return inputView.readBoolean();
+		}
+
+		@Override
+		public byte readByte() throws IOException {
+			return inputView.readByte();
+		}
+
+		@Override
+		public int readUnsignedByte() throws IOException {
+			return inputView.readUnsignedByte();
+		}
+
+		@Override
+		public short readShort() throws IOException {
+			return inputView.readShort();
+		}
+
+		@Override
+		public int readUnsignedShort() throws IOException {
+			return inputView.readUnsignedShort();
+		}
+
+		@Override
+		public char readChar() throws IOException {
+			return inputView.readChar();
+		}
+
+		@Override
+		public int readInt() throws IOException {
+			return inputView.readInt();
+		}
+
+		@Override
+		public long readLong() throws IOException {
+			return inputView.readLong();
+		}
+
+		@Override
+		public float readFloat() throws IOException {
+			return inputView.readFloat();
+		}
+
+		@Override
+		public double readDouble() throws IOException {
+			return inputView.readDouble();
+		}
+
+		@Override
+		public String readLine() throws IOException {
+			return inputView.readLine();
+		}
+
+		@Override
+		public String readUTF() throws IOException {
+			return inputView.readUTF();
+		}
+	}
+
+	private static class ByteBufDataOutputView implements DataOutputView {
+
+		private final ByteBufOutputStream outputView;
+
+		public ByteBufDataOutputView(ByteBuf buffer) {
+			this.outputView = new ByteBufOutputStream(buffer);
+		}
+
+		@Override
+		public void skipBytesToWrite(int numBytes) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void write(DataInputView source, int numBytes) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			outputView.write(b);
+		}
+
+		@Override
+		public void write(byte[] b) throws IOException {
+			outputView.write(b);
+		}
+
+		@Override
+		public void write(byte[] b, int off, int len) throws IOException {
+			outputView.write(b, off, len);
+		}
+
+		@Override
+		public void writeBoolean(boolean v) throws IOException {
+			outputView.writeBoolean(v);
+		}
+
+		@Override
+		public void writeByte(int v) throws IOException {
+			outputView.writeByte(v);
+		}
+
+		@Override
+		public void writeShort(int v) throws IOException {
+			outputView.writeShort(v);
+		}
+
+		@Override
+		public void writeChar(int v) throws IOException {
+			outputView.writeChar(v);
+		}
+
+		@Override
+		public void writeInt(int v) throws IOException {
+			outputView.writeInt(v);
+		}
+
+		@Override
+		public void writeLong(long v) throws IOException {
+			outputView.writeLong(v);
+		}
+
+		@Override
+		public void writeFloat(float v) throws IOException {
+			outputView.writeFloat(v);
+		}
+
+		@Override
+		public void writeDouble(double v) throws IOException {
+			outputView.writeDouble(v);
+		}
+
+		@Override
+		public void writeBytes(String s) throws IOException {
+			outputView.writeBytes(s);
+		}
+
+		@Override
+		public void writeChars(String s) throws IOException {
+			outputView.writeChars(s);
+		}
+
+		@Override
+		public void writeUTF(String s) throws IOException {
+			outputView.writeUTF(s);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
new file mode 100644
index 0000000..7cc3ec6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.channel.ChannelPipeline;
+
+public interface NettyProtocol {
+
+	void setServerChannelPipeline(ChannelPipeline channelPipeline);
+
+	void setClientChannelPipeline(ChannelPipeline channelPipeline);
+
+}


Mime
View raw message