flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [12/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:20 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
new file mode 100644
index 0000000..1944489
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * A fixed size pool of {@link MemorySegment} instances for the network stack.
+ * <p>
+ * This class is thread-safe.
+ */
+public class NetworkBufferPool implements BufferPoolFactory {
+
+	private static final Logger LOG = LoggerFactory.getLogger(NetworkBufferPool.class);
+
+	private final int totalNumberOfMemorySegments;
+
+	private final int memorySegmentSize;
+
+	private final Queue<MemorySegment> availableMemorySegments;
+
+	private volatile boolean isDestroyed;
+
+	// ---- Managed buffer pools ----------------------------------------------
+
+	private final Object factoryLock = new Object();
+
+	private final Set<LocalBufferPool> managedBufferPools = new HashSet<LocalBufferPool>();
+
+	public final Set<LocalBufferPool> allBufferPools = new HashSet<LocalBufferPool>();
+
+	private int numTotalRequiredBuffers;
+
+	/**
+	 * Allocates all {@link MemorySegment} instances managed by this pool.
+	 */
+	public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
+		this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
+		this.memorySegmentSize = segmentSize;
+		this.availableMemorySegments = new ArrayBlockingQueue<MemorySegment>(numberOfSegmentsToAllocate);
+
+		try {
+			for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
+				availableMemorySegments.add(new MemorySegment(new byte[segmentSize]));
+			}
+		}
+		catch (OutOfMemoryError err) {
+			int requiredMb = (numberOfSegmentsToAllocate * segmentSize) >> 20;
+			int allocatedMb = ((availableMemorySegments.size()) * segmentSize) >> 20;
+			int missingMb = requiredMb - allocatedMb;
+
+			throw new OutOfMemoryError("Could not allocate enough memory segments for GlobalBufferPool (required (Mb): " + requiredMb + ", allocated (Mb): " + allocatedMb + ", missing (Mb): " + missingMb + ").");
+		}
+
+		int allocatedMb = ((availableMemorySegments.size()) * segmentSize) >> 20;
+
+		LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).", allocatedMb, availableMemorySegments.size(), segmentSize);
+	}
+
+	public MemorySegment requestMemorySegment() {
+		return availableMemorySegments.poll();
+	}
+
+	// This is not safe with regard to destroy calls, but it does not hurt, because destroy happens
+	// only once at clean up time (task manager shutdown).
+	public void recycle(MemorySegment segment) {
+		availableMemorySegments.add(segment);
+	}
+
+	public void destroy() {
+		synchronized (factoryLock) {
+			isDestroyed = true;
+
+			MemorySegment segment;
+			while ((segment = availableMemorySegments.poll()) != null) {
+				segment.free();
+			}
+		}
+	}
+
+	public boolean isDestroyed() {
+		return isDestroyed;
+	}
+
+	public int getMemorySegmentSize() {
+		return memorySegmentSize;
+	}
+
+	public int getTotalNumberOfMemorySegments() {
+		return totalNumberOfMemorySegments;
+	}
+
+	public int getNumberOfAvailableMemorySegments() {
+		return availableMemorySegments.size();
+	}
+
+	public int getNumberOfRegisteredBufferPools() {
+		synchronized (factoryLock) {
+			return allBufferPools.size();
+		}
+	}
+
+	public int countBuffers() {
+		int buffers = 0;
+
+		synchronized (factoryLock) {
+			for (BufferPool bp : allBufferPools) {
+				buffers += bp.getNumBuffers();
+			}
+		}
+
+		return buffers;
+	}
+
+	// ------------------------------------------------------------------------
+	// BufferPoolFactory
+	// ------------------------------------------------------------------------
+
+	@Override
+	public BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throws IOException {
+		// It is necessary to use a separate lock from the one used for buffer
+		// requests to ensure deadlock freedom for failure cases.
+		synchronized (factoryLock) {
+			if (isDestroyed) {
+				throw new IllegalStateException("Network buffer pool has already been destroyed.");
+			}
+
+			// Ensure that the number of required buffers can be satisfied.
+			// With dynamic memory management this should become obsolete.
+			if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
+				throw new IOException(String.format("Insufficient number of network buffers: required %d, but only %d of %d available.", numRequiredBuffers, totalNumberOfMemorySegments - numTotalRequiredBuffers, totalNumberOfMemorySegments));
+			}
+
+			this.numTotalRequiredBuffers += numRequiredBuffers;
+
+			// We are good to go, create a new buffer pool and redistribute
+			// non-fixed size buffers.
+			LocalBufferPool localBufferPool = new LocalBufferPool(this, numRequiredBuffers);
+
+			// The fixed size pools get their share of buffers and don't change
+			// it during their lifetime.
+			if (!isFixedSize) {
+				managedBufferPools.add(localBufferPool);
+			}
+
+			allBufferPools.add(localBufferPool);
+
+			redistributeBuffers();
+
+			return localBufferPool;
+		}
+	}
+
+	@Override
+	public void destroyBufferPool(BufferPool bufferPool) throws IOException {
+		synchronized (factoryLock) {
+			if (allBufferPools.remove(bufferPool)) {
+				managedBufferPools.remove(bufferPool);
+
+				numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();
+
+				redistributeBuffers();
+			}
+		}
+	}
+
+	// Must be called from synchronized block
+	private void redistributeBuffers() throws IOException {
+		int numManagedBufferPools = managedBufferPools.size();
+
+		if (numManagedBufferPools == 0) {
+			return; // necessary to avoid div by zero when no managed pools
+		}
+
+		// All buffers, which are not among the required ones
+		int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
+
+		// Available excess (not required) buffers per pool
+		int numExcessBuffersPerPool = numAvailableMemorySegment / numManagedBufferPools;
+
+		// Distribute leftover buffers in round robin fashion
+		int numLeftoverBuffers = numAvailableMemorySegment % numManagedBufferPools;
+
+		int bufferPoolIndex = 0;
+
+		for (LocalBufferPool bufferPool : managedBufferPools) {
+			int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0;
+
+			bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
deleted file mode 100644
index b3d69f1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.bufferprovider;
-
-import org.apache.flink.runtime.io.network.Buffer;
-
-/**
- * This interface must be implemented to receive an asynchronous callback from
- * a {@link BufferProvider} as soon as a buffer has become available again.
- */
-public interface BufferAvailabilityListener {
-
-	/**
-	 * Returns a Buffer to the listener.
-	 * <p/>
-	 * Note: the listener has to adjust the size of the returned Buffer to the
-	 * requested size manually via {@link Buffer#limitSize(int)}.
-	 */
-	void bufferAvailable(Buffer buffer) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferProvider.java
deleted file mode 100644
index 2dd602f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferProvider.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.bufferprovider;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.Buffer;
-
-public interface BufferProvider {
-
-	/**
-	 * Requests a buffer with a minimum size of <code>minBufferSize</code>. The method returns immediately, even if the
-	 * request could not be fulfilled.
-	 *
-	 * @param minBufferSize minimum size of the requested buffer (in bytes)
-	 * @return buffer with at least the requested size or <code>null</code> if no such buffer is currently available
-	 * @throws IOException
-	 */
-	Buffer requestBuffer(int minBufferSize) throws IOException;
-
-	/**
-	 * Requests a buffer with a minimum size of <code>minBufferSize</code>. The method blocks until the request has
-	 * been fulfilled or {@link #reportAsynchronousEvent()} has been called.
-	 *
-	 * @param minBufferSize minimum size of the requested buffer (in bytes)
-	 * @return buffer with at least the requested size
-	 * @throws IOException
-	 * @throws InterruptedException
-	 */
-	Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException;
-
-	/**
-	 * Returns the size of buffers (in bytes) available at this buffer provider.
-	 * 
-	 * @return size of buffers (in bytes) available at this buffer provider
-	 */
-	int getBufferSize();
-
-	/**
-	 * Reports an asynchronous event and interrupts each blocking method of this buffer provider in order to allow the
-	 * blocked thread to respond to the event.
-	 */
-	void reportAsynchronousEvent();
-
-	/**
-	 * Registers the given {@link BufferAvailabilityListener} with an empty buffer pool.
-	 * <p>
-	 * The registration only succeeds, if the buffer pool is empty and has not been destroyed yet.
-	 * <p>
-	 * The registered listener will receive a notification when at least one buffer has become available again. After
-	 * the notification, the listener will be unregistered.
-	 *
-	 * @param listener the listener to be registered
-	 * @return <code>true</code> if the registration has been successful; <code>false</code> if the registration
-	 *         failed, because the buffer pool was not empty or has already been destroyed
-	 */
-	BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener);
-
-	public enum BufferAvailabilityRegistration {
-		SUCCEEDED_REGISTERED(),
-		FAILED_BUFFER_AVAILABLE(),
-		FAILED_BUFFER_POOL_DESTROYED()
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferProviderBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferProviderBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferProviderBroker.java
deleted file mode 100644
index 2f2c3b4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/BufferProviderBroker.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.bufferprovider;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-public interface BufferProviderBroker {
-
-	BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/DiscardBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/DiscardBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/DiscardBufferPool.java
deleted file mode 100644
index e166187..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/DiscardBufferPool.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.bufferprovider;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.BufferRecycler;
-
-public final class DiscardBufferPool implements BufferProvider, BufferRecycler {
-	
-	@Override
-	public Buffer requestBuffer(int minBufferSize) {
-		return null;
-	}
-
-	@Override
-	public Buffer requestBufferBlocking(int minBufferSize) {
-		return null;
-	}
-
-	@Override
-	public int getBufferSize() {
-		return 0;
-	}
-
-	@Override
-	public void reportAsynchronousEvent() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
-		return BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED;
-	}
-
-	@Override
-	public void recycle(MemorySegment buffer) {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
deleted file mode 100644
index b1d7adf..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.bufferprovider;
-
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.memory.MemorySegment;
-
-/**
- * A global buffer pool for the network stack.
- * <p>
- * All buffers used by the network stack come from this pool. Requests to this pool are mediated by instances of
- * {@link LocalBufferPool}.
- * <p>
- * The size and number of buffers can be configured via the global system config.
- */
-public final class GlobalBufferPool {
-
-	private final static Logger LOG = LoggerFactory.getLogger(GlobalBufferPool.class);
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/** Total number of buffers */
-	private final int numBuffers;
-
-	/** Size of each buffer (in bytes) */
-	private final int bufferSize;
-
-	/** The available buffers */
-	private final Queue<MemorySegment> buffers;
-
-	private boolean isDestroyed;
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public GlobalBufferPool(int numBuffers, int bufferSize) {
-		this.numBuffers = numBuffers;
-		this.bufferSize = bufferSize;
-
-		buffers = new ArrayBlockingQueue<MemorySegment>(numBuffers);
-
-		final int mb = 1 << 20;
-		final int memRequiredMb = (numBuffers * bufferSize) / mb;
-
-		for (int i = 0; i < numBuffers; i++) {
-			try {
-				byte[] buf = new byte[bufferSize];
-				buffers.add(new MemorySegment(buf));
-			} catch (OutOfMemoryError err) {
-				int memAllocatedMb = ((i + 1) * bufferSize) / mb;
-
-				String msg = String.format("Tried to allocate %d buffers of size %d bytes each (total: %d MB) " +
-						"and ran out of memory after %d buffers (%d MB).",
-						numBuffers, bufferSize, memRequiredMb, i + 1, memAllocatedMb);
-				throw new OutOfMemoryError(msg);
-			}
-		}
-
-		LOG.info(String.format("Allocated %d buffers of size %d bytes each (total: %d MB).",
-				numBuffers, bufferSize, memRequiredMb));
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Requests a buffer <strong>from</strong> the pool.
-	 *
-	 * @return buffer from pool or <code>null</code>, if no buffer available
-	 */
-	public MemorySegment requestBuffer() {
-		return this.buffers.poll();
-	}
-
-	/**
-	 * Returns a buffer <em>to</em> the pool.
-	 *
-	 * @param buffer the buffer to be returned
-	 */
-	public void returnBuffer(MemorySegment buffer) {
-		this.buffers.add(buffer);
-	}
-
-	/**
-	 * Returns the size of buffers (in bytes).
-	 *
-	 * @return size of buffers (in bytes)
-	 */
-	public int getBufferSize() {
-		return this.bufferSize;
-	}
-
-	/**
-	 * Returns the total number of managed buffers.
-	 * 
-	 * @return total number of managed buffers
-	 */
-	public int numBuffers() {
-		return this.numBuffers;
-	}
-
-	/**
-	 * Returns the number of currently available buffers.
-	 * 
-	 * @return currently available number of buffers
-	 */
-	public int numAvailableBuffers() {
-		return this.buffers.size();
-	}
-
-	public synchronized void destroy() {
-		if (!this.isDestroyed) {
-			// mark as shutdown and release memory
-			this.isDestroyed = true;
-
-			for (MemorySegment buffer : this.buffers) {
-				buffer.free();
-			}
-
-			this.buffers.clear();
-		}
-	}
-	
-	public boolean isDestroyed() {
-		return isDestroyed;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPool.java
deleted file mode 100644
index 2bac543..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPool.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.bufferprovider;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.BufferRecycler;
-
-/**
- * A buffer pool used to manage a designated number of buffers from a {@link GlobalBufferPool}.
- * <p>
- * A local buffer pool mediates buffer requests to the global buffer pool to ensure dead-lock free operation of the
- * network stack by limiting the number of designated buffers per local buffer pool. It also implements the default
- * mechanism for buffer recycling, which ensures that every buffer is ultimately returned to the global buffer pool.
- */
-public final class LocalBufferPool implements BufferProvider {
-
-	private static final class LocalBufferPoolRecycler implements BufferRecycler {
-
-		private final LocalBufferPool bufferPool;
-
-		private LocalBufferPoolRecycler(LocalBufferPool bufferPool) {
-			this.bufferPool = bufferPool;
-		}
-
-		@Override
-		public void recycle(MemorySegment buffer) {
-			this.bufferPool.recycleBuffer(buffer);
-		}
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/** Time (ms) to wait before retry for blocking buffer requests */
-	private static final int WAIT_TIME = 100;
-
-	/** Global buffer pool to request buffers from */
-	private final GlobalBufferPool globalBufferPool;
-
-	/** Buffers managed by this local buffer pool */
-	private final Queue<MemorySegment> buffers = new ArrayDeque<MemorySegment>();
-
-	/** The recycler via which to return buffers to this local buffer pool */
-	private final LocalBufferPoolRecycler recycler;
-
-	/** Queue of buffer availability listeners */
-	private final Queue<BufferAvailabilityListener> listeners = new ArrayDeque<BufferAvailabilityListener>();
-
-	/** Size of each buffer in this pool (in bytes) */
-	private final int bufferSize;
-
-	/** Number of buffers assigned to this local buffer pool */
-	private int numDesignatedBuffers;
-
-	/** Number of buffers requested from the global buffer pool */
-	private int numRequestedBuffers;
-
-	/** Flag to indicate whether an asynchronous event has been reported */
-	private boolean hasAsyncEventOccurred;
-
-	/** Flag to indicate whether this local buffer pool has been destroyed */
-	private boolean isDestroyed;
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public LocalBufferPool(GlobalBufferPool globalBufferPool, int numDesignatedBuffers) {
-		this.globalBufferPool = globalBufferPool;
-		this.bufferSize = globalBufferPool.getBufferSize();
-		this.numDesignatedBuffers = numDesignatedBuffers;
-
-		this.recycler = new LocalBufferPoolRecycler(this);
-	}
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@Override
-	public Buffer requestBuffer(int minBufferSize) throws IOException {
-		try {
-			return requestBuffer(minBufferSize, false);
-		} catch (InterruptedException e) {
-			throw new IOException("Unexpected InterruptedException while non-blocking buffer request.");
-		}
-	}
-
-	@Override
-	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
-		return requestBuffer(minBufferSize, true);
-	}
-
-	/**
-	 * Requests a buffer from this local buffer pool.
-	 * <p>
-	 * A non-blocking call to this method will only return a buffer, if one is available in the local pool after
-	 * having returned excess buffers. Otherwise, it will return <code>null</code>.
-	 * <p>
-	 * A blocking call will request a new buffer from the global buffer and block until one is available or an
-	 * asynchronous event has been reported via {@link #reportAsynchronousEvent()}.
-	 *
-	 * @param minBufferSize minimum size of requested buffer (in bytes)
-	 * @param isBlocking flag to indicate whether to block until buffer is available
-	 * @return buffer from the global buffer pool or <code>null</code>, if no buffer available
-	 * @throws IOException
-	 * @throws InterruptedException
-	 */
-	private Buffer requestBuffer(int minBufferSize, boolean isBlocking) throws IOException, InterruptedException {
-		if (minBufferSize > this.bufferSize) {
-			throw new IllegalArgumentException(String.format("Too large buffer requested (requested %d, maximum %d).",
-					minBufferSize, this.bufferSize));
-		}
-
-		while (true) {
-			boolean isAsyncRequest = false;
-
-			synchronized (this.buffers) {
-				// Return excess buffers to global buffer pool
-				while (this.numRequestedBuffers > this.numDesignatedBuffers) {
-					final MemorySegment buffer = this.buffers.poll();
-					if (buffer == null) {
-						break;
-					}
-
-					this.globalBufferPool.returnBuffer(buffer);
-					this.numRequestedBuffers--;
-				}
-
-				// Request buffers from global buffer pool
-				while (this.buffers.isEmpty()) {
-					if (this.numRequestedBuffers < this.numDesignatedBuffers) {
-						final MemorySegment buffer = this.globalBufferPool.requestBuffer();
-
-						if (buffer != null) {
-							this.buffers.add(buffer);
-
-							this.numRequestedBuffers++;
-							continue;
-						}
-					}
-
-					if (this.hasAsyncEventOccurred && isBlocking) {
-						this.hasAsyncEventOccurred = false;
-						isAsyncRequest = true;
-						break;
-					}
-
-					if (isBlocking) {
-						this.buffers.wait(WAIT_TIME);
-					} else {
-						return null;
-					}
-				}
-
-				if (!isAsyncRequest) {
-					return new Buffer(this.buffers.poll(), minBufferSize, this.recycler);
-				}
-			}
-		}
-	}
-
-	@Override
-	public int getBufferSize() {
-		return this.bufferSize;
-	}
-
-	@Override
-	public void reportAsynchronousEvent() {
-		synchronized (this.buffers) {
-			this.hasAsyncEventOccurred = true;
-			this.buffers.notify();
-		}
-	}
-
-	@Override
-	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
-		synchronized (this.buffers) {
-			if (!this.buffers.isEmpty()) {
-				return BufferAvailabilityRegistration.FAILED_BUFFER_AVAILABLE;
-			}
-
-			if (this.isDestroyed) {
-				return BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED;
-			}
-
-			this.listeners.add(listener);
-		}
-
-		return BufferAvailabilityRegistration.SUCCEEDED_REGISTERED;
-	}
-
-	/**
-	 * Sets the designated number of buffers for this local buffer pool and returns excess buffers to the global buffer
-	 * pool.
-	 * <p>
-	 * The designated number of buffers determines how many buffers this buffer pool is allowed to manage. New buffers
-	 * can only be requested, if the requested number of buffers is less than the designated number. If possible, excess
-	 * buffers will be returned to the global buffer pool.
-	 *
-	 * @param numDesignatedBuffers number of buffers designated for this local buffer pool
-	 */
-	public void setNumDesignatedBuffers(int numDesignatedBuffers) {
-		synchronized (this.buffers) {
-			this.numDesignatedBuffers = numDesignatedBuffers;
-
-			// Return excess buffers to global buffer pool
-			while (this.numRequestedBuffers > this.numDesignatedBuffers) {
-				if (this.buffers.isEmpty()) {
-					break;
-				}
-
-				this.globalBufferPool.returnBuffer(this.buffers.poll());
-				this.numRequestedBuffers--;
-			}
-
-			this.buffers.notify();
-		}
-	}
-
-	/**
-	 * Returns the number of buffers available in the local buffer pool.
-	 *
-	 * @return number of available buffers
-	 */
-	public int numAvailableBuffers() {
-		synchronized (this.buffers) {
-			return this.buffers.size();
-		}
-	}
-
-	/**
-	 * Returns the number of buffers, which have been requested from the global buffer pool.
-	 *
-	 * @return number of buffers requested from the global buffer pool
-	 */
-	public int numRequestedBuffers() {
-		synchronized (this.buffers) {
-			return this.numRequestedBuffers;
-		}
-	}
-
-	/**
-	 * Returns the designated number of buffers for this local buffer pool.
-	 *
-	 * @return number of designated buffers for this buffer pool
-	 */
-	public int numDesignatedBuffers() {
-		synchronized (this.buffers) {
-			return this.numDesignatedBuffers;
-		}
-	}
-
-	/**
-	 * Destroys this local buffer pool and immediately returns all available buffers to the global buffer pool.
-	 * <p>
-	 * Buffers, which have been requested from this local buffer pool via <code>requestBuffer</code> cannot be returned
-	 * immediately and will be returned when the respective buffer is recycled (see {@link #recycleBuffer(MemorySegment)}).
-	 */
-	public void destroy() {
-		synchronized (this.buffers) {
-			if (this.isDestroyed) {
-				return;
-			}
-
-			this.isDestroyed = true;
-
-			// return all buffers
-			while (!this.buffers.isEmpty()) {
-				this.globalBufferPool.returnBuffer(this.buffers.poll());
-				this.numRequestedBuffers--;
-			}
-		}
-	}
-
-	/**
-	 * Returns a buffer to the buffer pool and notifies listeners about the availability of a new buffer.
-	 *
-	 * @param buffer buffer to return to the buffer pool
-	 */
-	private void recycleBuffer(MemorySegment buffer) {
-		synchronized (this.buffers) {
-			if (this.isDestroyed) {
-				this.globalBufferPool.returnBuffer(buffer);
-				this.numRequestedBuffers--;
-			} else {
-				// if the number of designated buffers changed in the meantime, make sure
-				// to return the buffer to the global buffer pool
-				if (this.numRequestedBuffers > this.numDesignatedBuffers) {
-					this.globalBufferPool.returnBuffer(buffer);
-					this.numRequestedBuffers--;
-
-				} else if (!this.listeners.isEmpty()) {
-					Buffer availableBuffer = new Buffer(buffer, buffer.size(), this.recycler);
-					try {
-						this.listeners.poll().bufferAvailable(availableBuffer);
-					} catch (Exception e) {
-						this.buffers.add(buffer);
-						this.buffers.notify();
-					}
-
-				} else {
-					this.buffers.add(buffer);
-					this.buffers.notify();
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPoolOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPoolOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPoolOwner.java
deleted file mode 100644
index 11c10a5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/LocalBufferPoolOwner.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.bufferprovider;
-
-/**
- * A local buffer pool owner is an object which initially retrieves its buffers from the {@link GlobalBufferPool} and
- * manages its fraction of the overall buffer pool locally by means of a {@link LocalBufferPool}.
- * 
- */
-public interface LocalBufferPoolOwner {
-
-	/**
-	 * Returns the number of byte-buffered channels that will retrieve their buffers from the local buffer pool.
-	 * 
-	 * @return the number of byte-buffered channels that will retrieve their buffers from the local buffer pool
-	 */
-	int getNumberOfChannels();
-
-	/**
-	 * Sets the designated number of buffers the local buffer pool owner is allowed to fetch from the global buffer pool
-	 * and manage locally by means of the {@link LocalBufferPool}.
-	 * 
-	 * @param numBuffers
-	 *        the numBuffers the local buffer pool owner is allowed to fetch from the global buffer pool
-	 */
-	void setDesignatedNumberOfBuffers(int numBuffers);
-
-	/**
-	 * Clears the local buffer pool and returns all buffers to the global buffer pool.
-	 */
-	void clearLocalBufferPool();
-
-	void registerGlobalBufferPool(GlobalBufferPool globalBufferPool);
-
-	/**
-	 * Logs the current status of the local buffer pool. This method is intended mainly for debugging purposes.
-	 */
-	void logBufferUtilization();
-
-	/**
-	 * Reports an asynchronous event. Calling this method interrupts each blocking method of the buffer pool owner and
-	 * allows the blocked thread to respond to the event.
-	 */
-	void reportAsynchronousEvent();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/BufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/BufferOrEvent.java
deleted file mode 100644
index 1e891ae..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/BufferOrEvent.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.channels;
-
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.Buffer;
-
-/**
- * Either type for {@link org.apache.flink.runtime.io.network.Buffer} and {@link AbstractEvent}.
- */
-public class BufferOrEvent {
-	
-	private final Buffer buffer;
-	
-	private final AbstractEvent event;
-	
-	public BufferOrEvent(Buffer buffer) {
-		this.buffer = buffer;
-		this.event = null;
-	}
-	
-	public BufferOrEvent(AbstractEvent event) {
-		this.buffer = null;
-		this.event = event;
-	}
-	
-	public boolean isBuffer() {
-		return this.buffer != null;
-	}
-	
-	public boolean isEvent() {
-		return this.event != null;
-	}
-	
-	public Buffer getBuffer() {
-		return this.buffer;
-	}
-	
-	public AbstractEvent getEvent() {
-		return this.event;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/Channel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/Channel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/Channel.java
deleted file mode 100644
index 463ddfd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/Channel.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.channels;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.EnvelopeDispatcher;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-/**
- * The base class for channel objects.
- * <p>
- * Every channel has an index (at the corresponding gate), ID, and type. The connected channel is given by the ID of
- * destination channel.
- */
-public abstract class Channel {
-
-	private final ChannelID id;
-
-	private final ChannelID connectedId;
-
-	private final int index;
-
-	private final ChannelType type;
-
-	protected EnvelopeDispatcher envelopeDispatcher;
-
-	/**
-	 * Auxiliary constructor for channels
-	 * 
-	 * @param index the index of the channel in either the output or input gate
-	 * @param id the ID of the channel
-	 * @param connectedId the ID of the channel this channel is connected to
-	 */
-	protected Channel(int index, ChannelID id, ChannelID connectedId, ChannelType type) {
-		this.index = index;
-		this.id = id;
-		this.connectedId = connectedId;
-		this.type = type;
-	}
-
-	public int getIndex() {
-		return this.index;
-	}
-
-	public ChannelID getID() {
-		return this.id;
-	}
-
-	public ChannelID getConnectedId() {
-		return this.connectedId;
-	}
-
-	public ChannelType getChannelType() {
-		return this.type;
-	}
-
-	/**
-	 * Registers an EnvelopeDispatcher with this channel at runtime.
-	 *
-	 * @param envelopeDispatcher the envelope dispatcher to use for data transfers
-	 */
-	public void registerEnvelopeDispatcher(EnvelopeDispatcher envelopeDispatcher) {
-		this.envelopeDispatcher = envelopeDispatcher;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public abstract JobID getJobID();
-
-	public abstract boolean isInputChannel();
-
-	public abstract boolean isClosed() throws IOException, InterruptedException;
-
-	public abstract void transferEvent(AbstractEvent event) throws IOException, InterruptedException;
-
-	public abstract void queueEnvelope(Envelope envelope);
-
-	// nothing to do for buffer oriented runtime => TODO remove with pending changes for input side
-	public abstract void releaseAllResources();
-
-	// nothing to do for buffer oriented runtime => TODO remove with pending changes for input side
-	public abstract void destroy();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelCloseEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelCloseEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelCloseEvent.java
deleted file mode 100644
index 937817b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelCloseEvent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.channels;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-
-public final class ChannelCloseEvent extends AbstractEvent {
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		// Nothing to do here
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		// Nothing to do here
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelID.java
deleted file mode 100644
index f07a678..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelID.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.channels;
-
-import java.nio.ByteBuffer;
-
-import org.apache.flink.runtime.AbstractID;
-
-public class ChannelID extends AbstractID {
-
-	private static final long serialVersionUID = 1L;
-	
-	public ChannelID() {
-		super();
-	}
-
-	public ChannelID(long lowerPart, long upperPart) {
-		super(lowerPart, upperPart);
-	}
-
-	public ChannelID(AbstractID id) {
-		super(id);
-	}
-
-	public static ChannelID fromByteBuffer(ByteBuffer buf) {
-		long lower = buf.getLong();
-		long upper = buf.getLong();
-		return new ChannelID(lower, upper);
-	}
-
-	public static ChannelID fromByteBuffer(ByteBuffer buf, int offset) {
-		long lower = buf.getLong(offset);
-		long upper = buf.getLong(offset + 8);
-		return new ChannelID(lower, upper);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelType.java
deleted file mode 100644
index 854b8a9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ChannelType.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.channels;
-
-/**
- * An enumeration for the channel types.
- */
-public enum ChannelType {
-	
-	/** Network channels */
-	NETWORK,
-
-	/** In-memory channels */
-	IN_MEMORY
-}
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/EndOfSuperstepEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/EndOfSuperstepEvent.java
deleted file mode 100644
index 9f02583..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/EndOfSuperstepEvent.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.channels;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-
-/**
- * Marks the end of a superstep of one particular iteration head
- */
-public class EndOfSuperstepEvent extends AbstractEvent {
-	
-	public static final EndOfSuperstepEvent INSTANCE = new EndOfSuperstepEvent();
-
-	@Override
-	public void write(DataOutputView out) throws IOException {}
-
-	@Override
-	public void read(DataInputView in) throws IOException {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
deleted file mode 100644
index 80181be..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.channels;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.gates.InputChannelResult;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-
-/**
- * InputChannel is an abstract base class to all different kinds of concrete
- * input channels that can be used. Input channels are always parameterized to
- * a specific type that can be transported through the channel.
-
- * @param <T> The Type of the record that can be transported through the channel.
- */
-public class InputChannel<T extends IOReadableWritable> extends Channel implements BufferProvider {
-
-	private final InputGate<T> inputGate;
-
-	/**
-	 * The log object used to report warnings and errors.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(InputChannel.class);
-
-	/**
-	 * The deserializer used to deserialize records.
-	 */
-	private final RecordDeserializer<T> deserializer;
-
-	/**
-	 * Buffer for the uncompressed (raw) data.
-	 */
-	private Buffer dataBuffer;
-
-	private AbstractTaskEvent currentEvent;
-
-	/**
-	 * The exception observed in this channel while processing the buffers. Checked and thrown
-	 * per-buffer.
-	 */
-	private volatile IOException ioException;
-
-	/**
-	 * Stores the number of bytes read through this input channel since its instantiation.
-	 */
-	private long amountOfDataTransmitted;
-
-	private volatile boolean weClosedChannel;
-	
-	private volatile boolean senderClosedChannel;
-
-	// -------------------------------------------------------------------------------------------
-
-	private int lastReceivedEnvelope = -1;
-
-	private boolean destroyCalled = false;
-
-	// ----------------------
-
-	private Queue<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
-
-	private Iterator<AbstractEvent> pendingEvents;
-
-	/**
-	 * Constructs an input channel with a given input gate associated.
-	 * 
-	 * @param inputGate
-	 *        the input gate this channel is connected to
-	 * @param channelIndex
-	 *        the index of the channel in the input gate
-	 * @param channelID
-	 *        the ID of the channel
-	 * @param connectedChannelID
-	 *        the ID of the channel this channel is connected to
-	 */
-	public InputChannel(final InputGate<T> inputGate, final int channelIndex, final ChannelID channelID,
-						final ChannelID connectedChannelID, ChannelType type) {
-		super(channelIndex, channelID, connectedChannelID, type);
-		this.inputGate = inputGate;
-		this.deserializer = new AdaptiveSpanningRecordDeserializer<T>();
-	}
-
-	/**
-	 * Returns the input gate associated with the input channel.
-	 * 
-	 * @return the input gate associated with the input channel.
-	 */
-	public InputGate<T> getInputGate() {
-		return this.inputGate;
-	}
-
-	@Override
-	public boolean isInputChannel() {
-		return true;
-	}
-
-
-	@Override
-	public JobID getJobID() {
-		return this.inputGate.getJobID();
-	}
-
-
-	public InputChannelResult readRecord(T target) throws IOException {
-		if (this.dataBuffer == null) {
-			if (isClosed()) {
-				return InputChannelResult.END_OF_STREAM;
-			}
-
-			// get the next element we need to handle (buffer or event)
-			BufferOrEvent boe = getNextBufferOrEvent();
-
-			if (boe == null) {
-				throw new IllegalStateException("Input channel was queries for data even though none was announced available.");
-			}
-
-			// handle events
-			if (boe.isEvent())
-			{
-				// sanity check: an event may only come after a complete record.
-				if (this.deserializer.hasUnfinishedData()) {
-					throw new IllegalStateException("Channel received an event before completing the current partial record.");
-				}
-
-				AbstractEvent evt = boe.getEvent();
-				if (evt.getClass() == ChannelCloseEvent.class) {
-					this.senderClosedChannel = true;
-					try {
-						close();
-					} catch (InterruptedException e) {
-						throw new IOException(e);
-					}
-					return InputChannelResult.END_OF_STREAM;
-				}
-				else if (evt.getClass() == EndOfSuperstepEvent.class) {
-					return InputChannelResult.END_OF_SUPERSTEP;
-				}
-				else if (evt instanceof AbstractTaskEvent) {
-					this.currentEvent = (AbstractTaskEvent) evt;
-					return InputChannelResult.TASK_EVENT;
-				}
-				else {
-					LOG.error("Received unknown event: " + evt);
-					return InputChannelResult.NONE;
-				}
-			} else {
-				// buffer case
-				this.dataBuffer = boe.getBuffer();
-				this.deserializer.setNextMemorySegment(this.dataBuffer.getMemorySegment(), this.dataBuffer.size());
-			}
-		}
-
-		DeserializationResult deserializationResult = this.deserializer.getNextRecord(target);
-
-		if (deserializationResult.isBufferConsumed()) {
-			releasedConsumedReadBuffer(this.dataBuffer);
-			this.dataBuffer = null;
-		}
-
-		if (deserializationResult == DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER) {
-			return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-		} else if (deserializationResult == DeserializationResult.LAST_RECORD_FROM_BUFFER) {
-			return InputChannelResult.LAST_RECORD_FROM_BUFFER;
-		} else if (deserializationResult == DeserializationResult.PARTIAL_RECORD) {
-			return InputChannelResult.NONE;
-		} else {
-			throw new IllegalStateException();
-		}
-	}
-
-	@Override
-	public ChannelType getChannelType() {
-		return null;
-	}
-
-	@Override
-	public boolean isClosed() throws IOException{
-		if (this.ioException != null) {
-			throw new IOException("An error occurred in the channel: " + this.ioException.getMessage(), this.ioException);
-		} else {
-			return this.weClosedChannel && this.senderClosedChannel;
-		}
-	}
-
-	public void close() throws IOException, InterruptedException {
-		
-		if (weClosedChannel) {
-			return;
-		}
-		weClosedChannel = true;
-
-		this.deserializer.clear();
-		if (this.dataBuffer != null) {
-			releasedConsumedReadBuffer(this.dataBuffer);
-			this.dataBuffer = null;
-		}
-
-		// This code fragment makes sure the isClosed method works in case the channel input has not been fully consumed
-		while (!this.senderClosedChannel)
-		{
-			BufferOrEvent next = getNextBufferOrEvent();
-			if (next != null) {
-				if (next.isEvent()) {
-					if (next.getEvent() instanceof ChannelCloseEvent) {
-						this.senderClosedChannel = true;
-					}
-				} else {
-					releasedConsumedReadBuffer(next.getBuffer());
-				}
-			} else {
-				Thread.sleep(200);
-			}
-		}
-
-		// Send close event to indicate the input channel has successfully
-		// processed all data it is interested in.
-		transferEventToOutputChannel(new ChannelCloseEvent());
-	}
-
-
-	private void releasedConsumedReadBuffer(Buffer buffer) {
-		this.amountOfDataTransmitted += buffer.size();
-		buffer.recycleBuffer();
-	}
-
-
-	public void notifyGateThatInputIsAvailable() {
-		this.getInputGate().notifyRecordIsAvailable(getIndex());
-	}
-
-
-	@Override
-	public void transferEvent(AbstractEvent event) throws IOException, InterruptedException {
-		transferEventToOutputChannel(event);
-	}
-
-
-	public void reportIOException(IOException ioe) {
-		this.ioException = ioe;
-	}
-
-
-	@Override
-	public void releaseAllResources() {
-		this.senderClosedChannel = true;
-		this.deserializer.clear();
-		
-		Buffer buf = this.dataBuffer;
-		if (buf != null) {
-			buf.recycleBuffer();
-			dataBuffer = null;
-		}
-
-		// The buffers are recycled by the input channel wrapper
-	}
-
-	/**
-	 * Notify the channel that a data unit has been consumed.
-	 */
-	public void notifyDataUnitConsumed() {
-		this.getInputGate().notifyDataUnitConsumed(getIndex());
-	}
-
-	public AbstractTaskEvent getCurrentEvent() {
-		AbstractTaskEvent e = this.currentEvent;
-		this.currentEvent = null;
-		return e;
-	}
-
-	// InputChannelContext
-
-	@Override
-	public void queueEnvelope(Envelope envelope) {
-		// The sequence number of the envelope to be queued
-		final int sequenceNumber = envelope.getSequenceNumber();
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (this.destroyCalled) {
-				final Buffer buffer = envelope.getBuffer();
-				if (buffer != null) {
-					buffer.recycleBuffer();
-				}
-				return;
-			}
-
-			final int expectedSequenceNumber = this.lastReceivedEnvelope + 1;
-			if (sequenceNumber != expectedSequenceNumber) {
-				// This is a problem, now we are actually missing some data
-				reportIOException(new IOException("Expected data packet " + expectedSequenceNumber + " but received " + sequenceNumber));
-
-				// notify that something (an exception) is available
-				notifyGateThatInputIsAvailable();
-
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Input channel " + this.toString() + " expected envelope " + expectedSequenceNumber
-							+ " but received " + sequenceNumber);
-				}
-
-				// rescue the buffer
-				final Buffer buffer = envelope.getBuffer();
-				if (buffer != null) {
-					buffer.recycleBuffer();
-				}
-			} else {
-
-				this.queuedEnvelopes.add(envelope);
-				this.lastReceivedEnvelope = sequenceNumber;
-
-				// Notify the channel about the new data. notify as much as there is (buffer plus once per event)
-				if (envelope.getBuffer() != null) {
-					notifyGateThatInputIsAvailable();
-				}
-
-				List<? extends AbstractEvent> events = envelope.deserializeEvents();
-
-				if (events != null) {
-					for (int i = 0; i < events.size(); i++) {
-						notifyGateThatInputIsAvailable();
-					}
-				}
-			}
-		}
-	}
-
-	@Override
-	public void destroy() {
-		final Queue<Buffer> buffersToRecycle = new ArrayDeque<Buffer>();
-
-		synchronized (this.queuedEnvelopes) {
-			this.destroyCalled = true;
-
-			while (!this.queuedEnvelopes.isEmpty()) {
-				final Envelope envelope = this.queuedEnvelopes.poll();
-				if (envelope.getBuffer() != null) {
-					buffersToRecycle.add(envelope.getBuffer());
-				}
-			}
-		}
-
-		while (!buffersToRecycle.isEmpty()) {
-			buffersToRecycle.poll().recycleBuffer();
-		}
-	}
-
-	public void logQueuedEnvelopes() {
-		int numberOfQueuedEnvelopes = 0;
-		int numberOfQueuedEnvelopesWithMemoryBuffers = 0;
-		int numberOfQueuedEnvelopesWithFileBuffers = 0;
-
-		synchronized (this.queuedEnvelopes) {
-
-			final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
-			while (it.hasNext()) {
-
-				final Envelope envelope = it.next();
-				++numberOfQueuedEnvelopes;
-				final Buffer buffer = envelope.getBuffer();
-				if (buffer == null) {
-					continue;
-				}
-
-				++numberOfQueuedEnvelopesWithMemoryBuffers;
-			}
-		}
-
-		System.out.println("\t\t" + this.toString() + ": " + numberOfQueuedEnvelopes + " ("
-				+ numberOfQueuedEnvelopesWithMemoryBuffers + ", " + numberOfQueuedEnvelopesWithFileBuffers + ")");
-
-	}
-
-	@Override
-	public Buffer requestBuffer(int minBufferSize) throws IOException {
-		return this.inputGate.requestBuffer(minBufferSize);
-	}
-
-	@Override
-	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
-		return this.inputGate.requestBufferBlocking(minBufferSize);
-	}
-
-	@Override
-	public int getBufferSize() {
-		return this.inputGate.getBufferSize();
-	}
-
-	@Override
-	public void reportAsynchronousEvent() {
-		this.inputGate.reportAsynchronousEvent();
-	}
-
-	@Override
-	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
-		return this.inputGate.registerBufferAvailabilityListener(listener);
-	}
-
-	// ChannelBroker
-
-	public BufferOrEvent getNextBufferOrEvent() throws IOException {
-		// return pending events first
-		if (this.pendingEvents != null) {
-			// if the field is not null, it must always have a next value!
-			BufferOrEvent next = new BufferOrEvent(this.pendingEvents.next());
-			if (!this.pendingEvents.hasNext()) {
-				this.pendingEvents = null;
-			}
-			return next;
-		}
-
-		// if no events are pending, get the next buffer
-		Envelope nextEnvelope;
-		synchronized (this.queuedEnvelopes) {
-			if (this.queuedEnvelopes.isEmpty()) {
-				return null;
-			}
-			nextEnvelope = this.queuedEnvelopes.poll();
-		}
-
-		// schedule events as pending, because events come always after the buffer!
-		@SuppressWarnings("unchecked")
-		List<AbstractEvent> events = (List<AbstractEvent>) nextEnvelope.deserializeEvents();
-		Iterator<AbstractEvent> eventsIt = events.iterator();
-		if (eventsIt.hasNext()) {
-			this.pendingEvents = eventsIt;
-		}
-
-		// get the buffer, if there is one
-		if (nextEnvelope.getBuffer() != null) {
-			return new BufferOrEvent(nextEnvelope.getBuffer());
-		}
-		else if (this.pendingEvents != null) {
-			// if the field is not null, it must always have a next value!
-			BufferOrEvent next = new BufferOrEvent(this.pendingEvents.next());
-			if (!this.pendingEvents.hasNext()) {
-				this.pendingEvents = null;
-			}
-
-			return next;
-		}
-		else {
-			// no buffer and no events, this should be an error
-			throw new IOException("Received an envelope with neither data nor events.");
-		}
-	}
-
-	public void transferEventToOutputChannel(AbstractEvent event) throws IOException, InterruptedException {
-		Envelope ephemeralEnvelope = new Envelope(0, getJobID(), getID());
-		ephemeralEnvelope.serializeEventList(Arrays.asList(event));
-
-		this.envelopeDispatcher.dispatchFromInputChannel(ephemeralEnvelope);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/OutputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/OutputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/OutputChannel.java
deleted file mode 100644
index 3647ce4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/OutputChannel.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.channels;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.gates.OutputGate;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-
-public class OutputChannel extends Channel {
-
-	private static final Logger LOG = LoggerFactory.getLogger(OutputChannel.class);
-
-	private final Object closeLock = new Object();
-	
-	private final OutputGate outputGate;
-
-	private boolean senderCloseRequested;
-
-	private boolean receiverCloseRequested;
-
-	private int currentSeqNum;
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Creates a new output channel object.
-	 *
-	 * @param outputGate the output gate this channel is connected to
-	 * @param index the index of the channel in the output gate
-	 * @param id the ID of the channel
-	 * @param connectedId the ID of the channel this channel is connected to
-	 * @param type the type of this channel
-	 */
-	public OutputChannel(OutputGate outputGate, int index, ChannelID id, ChannelID connectedId, ChannelType type) {
-		super(index, id, connectedId, type);
-
-		this.outputGate = outputGate;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                           Data processing
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public void sendBuffer(Buffer buffer) throws IOException, InterruptedException {
-		checkStatus();
-
-		// discard empty buffers
-		if (buffer.size() == 0) {
-			buffer.recycleBuffer();
-			return;
-		}
-
-		Envelope envelope = createNextEnvelope();
-		envelope.setBuffer(buffer);
-		this.envelopeDispatcher.dispatchFromOutputChannel(envelope);
-	}
-
-	public void sendEvent(AbstractEvent event) throws IOException, InterruptedException {
-		checkStatus();
-
-		Envelope envelope = createNextEnvelope();
-		envelope.serializeEventList(Arrays.asList(event));
-		this.envelopeDispatcher.dispatchFromOutputChannel(envelope);
-	}
-
-	public void sendBufferAndEvent(Buffer buffer, AbstractEvent event) throws IOException, InterruptedException {
-		checkStatus();
-
-		Envelope envelope = createNextEnvelope();
-		envelope.setBuffer(buffer);
-		envelope.serializeEventList(Arrays.asList(event));
-		this.envelopeDispatcher.dispatchFromOutputChannel(envelope);
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                          Event processing
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@Override
-	public void queueEnvelope(Envelope envelope) {
-		if (envelope.hasBuffer()) {
-			throw new IllegalStateException("Envelope for OutputChannel has Buffer attached.");
-		}
-
-		for (AbstractEvent event : envelope.deserializeEvents()) {
-			if (event.getClass() == ChannelCloseEvent.class) {
-				synchronized (this.closeLock) {
-					this.receiverCloseRequested = true;
-					this.closeLock.notifyAll();
-				}
-			} 
-			else if (event instanceof AbstractTaskEvent) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("OutputChannel received task event: " + event);
-				}
-				
-				this.outputGate.deliverEvent((AbstractTaskEvent) event);
-			}
-			else {
-				throw new RuntimeException("OutputChannel received an event that is neither close nor task event.");
-			}
-		}
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                              Shutdown
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public void requestClose() throws IOException, InterruptedException {
-		if (this.senderCloseRequested) {
-			return;
-		}
-
-		this.senderCloseRequested = true;
-
-		Envelope envelope = createNextEnvelope();
-		envelope.serializeEventList(Arrays.asList(new ChannelCloseEvent()));
-		this.envelopeDispatcher.dispatchFromOutputChannel(envelope);
-	}
-
-	@Override
-	public boolean isClosed() {
-		return this.senderCloseRequested && this.receiverCloseRequested;
-	}
-	
-	public void waitForChannelToBeClosed() throws InterruptedException {
-		synchronized (this.closeLock) {
-			while (!this.receiverCloseRequested) {
-				this.closeLock.wait(1000);
-			}
-		}
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@Override
-	public boolean isInputChannel() {
-		return false;
-	}
-
-	@Override
-	public JobID getJobID() {
-		return this.outputGate.getJobID();
-	}
-	
-	private void checkStatus() throws IOException {
-		if (this.senderCloseRequested) {
-			throw new IllegalStateException(String.format("Channel %s already requested to be closed", getID()));
-		}
-		if (this.receiverCloseRequested) {
-			throw new ReceiverAlreadyClosedException();
-		}
-	}
-
-	private Envelope createNextEnvelope() {
-		return new Envelope(this.currentSeqNum++, getJobID(), getID());
-	}
-
-	@Override
-	public void transferEvent(AbstractEvent event) throws IOException, InterruptedException {
-		// TODO remove with pending changes for input side
-	}
-
-	@Override
-	public void releaseAllResources() {
-		// nothing to do for buffer oriented runtime => TODO remove with pending changes for input side
-	}
-
-	@Override
-	public void destroy() {
-		// nothing to do for buffer oriented runtime => TODO remove with pending changes for input side
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ReceiverAlreadyClosedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ReceiverAlreadyClosedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ReceiverAlreadyClosedException.java
deleted file mode 100644
index 4600216..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/ReceiverAlreadyClosedException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.channels;
-
-import java.io.IOException;
-
-
-public class ReceiverAlreadyClosedException extends IOException {
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/Gate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/Gate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/Gate.java
deleted file mode 100644
index 982ba74..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/Gate.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.gates;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.event.task.EventListener;
-import org.apache.flink.runtime.event.task.EventNotificationManager;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-/**
- * In Nephele a gate represents the connection between a user program and the processing framework. A gate
- * must be connected to exactly one record reader/writer and to at least one channel. The <code>Gate</code> class itself
- * is abstract. A gate automatically created for every record reader/writer in the user program. A gate can only be used
- * to transport one specific type of records.
- * <p>
- * This class in general is not thread-safe.
- * 
- * @param <T>
- *        the record type to be transported from this gate
- *
- *  TODO refactor with changes to input side
- */
-public abstract class Gate<T extends IOReadableWritable> {
-
-	/**
-	 * The ID of the job this gate belongs to.
-	 */
-	private final JobID jobID;
-
-	/**
-	 * The ID of this gate.
-	 */
-	private final GateID gateID;
-
-	/**
-	 * The index of the gate in the list of available input/output gates.
-	 */
-	private final int index;
-
-	/**
-	 * The event notification manager used to dispatch events.
-	 */
-	private final EventNotificationManager eventNotificationManager = new EventNotificationManager();
-
-	/**
-	 * The type of input/output channels connected to this gate.
-	 */
-	private ChannelType channelType = ChannelType.NETWORK;
-
-	/**
-	 * Constructs a new abstract gate
-	 * 
-	 * @param jobID
-	 *        the ID of the job this gate belongs to
-	 * @param gateID
-	 *        the ID of this gate
-	 * @param index
-	 *        the index of the gate in the list of available input/output gates.
-	 */
-	protected Gate(final JobID jobID, final GateID gateID, final int index) {
-		this.jobID = jobID;
-		this.gateID = gateID;
-		this.index = index;
-	}
-
-	public final int getIndex() {
-		return this.index;
-	}
-
-	/**
-	 * Returns the event notification manager used to dispatch events.
-	 * 
-	 * @return the event notification manager used to dispatch events
-	 */
-	protected final EventNotificationManager getEventNotificationManager() {
-		return this.eventNotificationManager;
-	}
-
-	public String toString() {
-
-		return "Gate " + this.index;
-	}
-
-	public final void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-
-		this.eventNotificationManager.subscribeToEvent(eventListener, eventType);
-	}
-
-	public final void unsubscribeFromEvent(final EventListener eventListener,
-			final Class<? extends AbstractTaskEvent> eventType) {
-
-		this.eventNotificationManager.unsubscribeFromEvent(eventListener, eventType);
-	}
-
-	public final void deliverEvent(final AbstractTaskEvent event) {
-
-		this.eventNotificationManager.deliverEvent((AbstractTaskEvent) event);
-	}
-
-	public final void setChannelType(final ChannelType channelType) {
-
-		this.channelType = channelType;
-	}
-
-	public final ChannelType getChannelType() {
-
-		return this.channelType;
-	}
-
-	public JobID getJobID() {
-
-		return this.jobID;
-	}
-
-	public GateID getGateID() {
-
-		return this.gateID;
-	}
-
-	// FROM GATE INTERFACE
-
-	/**
-	 * Publishes an event.
-	 *
-	 * @param event
-	 *        the event to be published
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the event
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the event to be published
-	 */
-	abstract public void publishEvent(AbstractEvent event) throws IOException, InterruptedException;
-
-	/**
-	 * Releases the allocated resources (particularly buffer) of all channels attached to this gate. This method
-	 * should only be called after the respected task has stopped running.
-	 */
-	abstract public void releaseAllChannelResources();
-
-	/**
-	 * Checks if the gate is closed. The gate is closed if all this associated channels are closed.
-	 *
-	 * @return <code>true</code> if the gate is closed, <code>false</code> otherwise
-	 * @throws IOException
-	 *         thrown if any error occurred while closing the gate
-	 * @throws InterruptedException
-	 *         thrown if the gate is interrupted while waiting for this operation to complete
-	 */
-	abstract public boolean isClosed() throws IOException, InterruptedException;
-
-	/**
-	 * Checks if the considered gate is an input gate.
-	 *
-	 * @return <code>true</code> if the considered gate is an input gate, <code>false</code> if it is an output gate
-	 */
-	abstract public boolean isInputGate();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/GateID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/GateID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/GateID.java
deleted file mode 100644
index 3ecb41b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/GateID.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.gates;
-
-import org.apache.flink.runtime.AbstractID;
-
-/**
- * A class for statistically unique gate IDs.
- */
-public final class GateID extends AbstractID {
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputChannelResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputChannelResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputChannelResult.java
deleted file mode 100644
index 54c330b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputChannelResult.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.gates;
-
-public enum InputChannelResult {
-
-	NONE,
-	INTERMEDIATE_RECORD_FROM_BUFFER,
-	LAST_RECORD_FROM_BUFFER,
-	END_OF_SUPERSTEP,
-	TASK_EVENT,
-	END_OF_STREAM;
-}


Mime
View raw message