flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [09/19] flink git commit: [streaming] Major internal renaming and restructure
Date Wed, 15 Apr 2015 09:38:50 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
deleted file mode 100644
index 3c8824b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-public class IndexedMutableReader<T extends IOReadableWritable> extends
-		StreamingMutableRecordReader<T> {
-
-	InputGate reader;
-
-	public IndexedMutableReader(InputGate reader) {
-		super(reader);
-		this.reader = reader;
-	}
-
-	public int getNumberOfInputChannels() {
-		return reader.getNumberOfInputChannels();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java
deleted file mode 100644
index 18cdd4e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-
-public class IndexedReaderIterator<T> extends ReaderIterator<T> {
-
-	public IndexedReaderIterator(
-			IndexedMutableReader<DeserializationDelegate<T>> reader,
-			TypeSerializer<T> serializer) {
-
-		super(reader, serializer);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/InputGateFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/InputGateFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/InputGateFactory.java
deleted file mode 100644
index 3e6edb9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/InputGateFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.util.Collection;
-
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-
-public class InputGateFactory {
-
-	public static InputGate createInputGate(Collection<InputGate> inputGates) {
-		return createInputGate(inputGates.toArray(new InputGate[inputGates.size()]));
-	}
-
-	public static InputGate createInputGate(InputGate[] inputGates) {
-		if (inputGates.length <= 0) {
-			throw new RuntimeException("No such input gate.");
-		}
-
-		if (inputGates.length < 2) {
-			return inputGates[0];
-		} else {
-			return new UnionInputGate(inputGates);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/RecordWriterFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/RecordWriterFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/RecordWriterFactory.java
deleted file mode 100644
index e859225..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/RecordWriterFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RecordWriterFactory {
-	private static final Logger LOG = LoggerFactory.getLogger(RecordWriterFactory.class);
-
-	public static <OUT extends IOReadableWritable> RecordWriter<OUT> createRecordWriter(ResultPartitionWriter bufferWriter, ChannelSelector<OUT> channelSelector, long bufferTimeout) {
-
-		RecordWriter<OUT> output;
-
-		if (bufferTimeout >= 0) {
-			output = new StreamRecordWriter<OUT>(bufferWriter, channelSelector, bufferTimeout);
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("StreamRecordWriter initiated with {} bufferTimeout.", bufferTimeout);
-			}
-		} else {
-			output = new RecordWriter<OUT>(bufferWriter, channelSelector);
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("RecordWriter initiated.");
-			}
-		}
-
-		return output;
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
deleted file mode 100644
index ce16b8c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-public class SpillReader {
-
-	private FileChannel spillingChannel;
-	private File spillFile;
-
-	/**
-	 * Reads the next buffer from the spilled file.
-	 */
-	public Buffer readNextBuffer(int bufferSize) throws IOException {
-		try {
-			Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]),
-					new BufferRecycler() {
-
-						@Override
-						public void recycle(MemorySegment memorySegment) {
-							memorySegment.free();
-						}
-					});
-
-			spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize));
-
-			return buffer;
-		} catch (Exception e) {
-			close();
-			throw new IOException(e);
-		}
-	}
-
-	@SuppressWarnings("resource")
-	public void setSpillFile(File nextSpillFile) throws IOException {
-		// We can close and delete the file now
-		close();
-		if (spillFile != null) {
-			spillFile.delete();
-		}
-		this.spillFile = nextSpillFile;
-		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
-	}
-
-	public File getSpillFile() {
-		return spillFile;
-	}
-
-	public void close() throws IOException {
-		if (this.spillingChannel != null && this.spillingChannel.isOpen()) {
-			this.spillingChannel.close();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
deleted file mode 100644
index ef3410e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-
-public class SpillingBufferOrEvent {
-
-	private BufferOrEvent boe;
-	private boolean isSpilled = false;
-
-	private SpillReader spillReader;
-
-	private int channelIndex;
-	private int bufferSize;
-
-	public SpillingBufferOrEvent(BufferOrEvent boe, BufferSpiller spiller, SpillReader reader)
-			throws IOException {
-
-		this.boe = boe;
-		this.channelIndex = boe.getChannelIndex();
-		this.spillReader = reader;
-
-		if (boe.isBuffer()) {
-			this.bufferSize = boe.getBuffer().getSize();
-			spiller.spill(boe.getBuffer());
-			this.boe = null;
-			this.isSpilled = true;
-		}
-	}
-
-	/**
-	 * If the buffer wasn't spilled simply returns the instance from the field,
-	 * otherwise reads it from the spill reader
-	 */
-	public BufferOrEvent getBufferOrEvent() throws IOException {
-		if (isSpilled) {
-			boe = new BufferOrEvent(spillReader.readNextBuffer(bufferSize), channelIndex);
-			this.isSpilled = false;
-			return boe;
-		} else {
-			return boe;
-		}
-	}
-
-	public boolean isSpilled() {
-		return isSpilled;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
deleted file mode 100644
index a1fdc09..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-
-import java.io.IOException;
-
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
-
-	private long timeout;
-
-	private OutputFlusher outputFlusher;
-
-	public StreamRecordWriter(ResultPartitionWriter writer) {
-		this(writer, new RoundRobinChannelSelector<T>(), 1000);
-	}
-
-	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
-		this(writer, channelSelector, 1000);
-	}
-
-	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
-			long timeout) {
-		super(writer, channelSelector);
-
-		this.timeout = timeout;
-		this.outputFlusher = new OutputFlusher();
-
-		outputFlusher.start();
-	}
-
-	public void close() {
-		try {
-			if (outputFlusher != null) {
-				outputFlusher.terminate();
-				outputFlusher.join();
-			}
-
-			flush();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		} catch (InterruptedException e) {
-			// Do nothing here
-		}
-	}
-
-	private class OutputFlusher extends Thread {
-		private volatile boolean running = true;
-
-		public void terminate() {
-			running = false;
-		}
-
-		@Override
-		public void run() {
-			while (running) {
-				try {
-					flush();
-					Thread.sleep(timeout);
-				} catch (InterruptedException e) {
-					// Do nothing here
-				} catch (IOException e) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
deleted file mode 100644
index 8e939c6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A record-oriented reader.
- * <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- * 
- * @param <T>
- *            The type of the record that can be read with this record reader.
- */
-public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends
-		AbstractReader implements ReaderBase, StreamingReader {
-
-	@SuppressWarnings("unused")
-	private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
-
-	private final RecordDeserializer<T>[] recordDeserializers;
-
-	private RecordDeserializer<T> currentRecordDeserializer;
-
-	private boolean isFinished;
-
-	private final BarrierBuffer barrierBuffer;
-
-	@SuppressWarnings("unchecked")
-	protected StreamingAbstractRecordReader(InputGate inputGate) {
-		super(inputGate);
-		barrierBuffer = new BarrierBuffer(inputGate, this);
-
-		// Initialize one deserializer per input channel
-		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
-				.getNumberOfInputChannels()];
-		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
-		}
-	}
-
-	protected boolean getNextRecord(T target) throws IOException, InterruptedException {
-		if (isFinished) {
-			return false;
-		}
-
-		while (true) {
-			if (currentRecordDeserializer != null) {
-				DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
-
-				if (result.isBufferConsumed()) {
-					currentRecordDeserializer.getCurrentBuffer().recycle();
-					currentRecordDeserializer = null;
-				}
-
-				if (result.isFullRecord()) {
-					return true;
-				}
-			}
-
-			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
-			if (bufferOrEvent.isBuffer()) {
-				currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
-				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-			} else {
-				// Event received
-				final AbstractEvent event = bufferOrEvent.getEvent();
-
-				if (event instanceof StreamingSuperstep) {
-					barrierBuffer.processSuperstep(bufferOrEvent);
-				} else {
-					if (handleEvent(event)) {
-						if (inputGate.isFinished()) {
-							if (!barrierBuffer.isEmpty()) {
-								throw new RuntimeException(
-										"BarrierBuffer should be empty at this point");
-							}
-							isFinished = true;
-							return false;
-						} else if (hasReachedEndOfSuperstep()) {
-							return false;
-						} // else: More data is coming...
-					}
-				}
-			}
-		}
-	}
-
-	public void clearBuffers() {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			Buffer buffer = deserializer.getCurrentBuffer();
-			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
-			}
-		}
-	}
-
-	public void cleanup() throws IOException {
-		barrierBuffer.cleanup();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
deleted file mode 100644
index e868879..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
-		StreamingAbstractRecordReader<T> implements MutableReader<T> {
-
-	public StreamingMutableRecordReader(InputGate inputGate) {
-		super(inputGate);
-	}
-
-	@Override
-	public boolean next(final T target) throws IOException, InterruptedException {
-		return getNextRecord(target);
-	}
-
-	@Override
-	public void clearBuffers() {
-		super.clearBuffers();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java
deleted file mode 100644
index 74b986a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-public interface StreamingReader {
-
-	public void cleanup() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
deleted file mode 100644
index 3813c8a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-/**
- * Partitioner that selects all the output channels.
- *
- * @param <T>
- *            Type of the Tuple
- */
-public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	int[] returnArray;
-	boolean set;
-	int setNumber;
-
-	public BroadcastPartitioner() {
-		super(PartitioningStrategy.BROADCAST);
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		if (set && setNumber == numberOfOutputChannels) {
-			return returnArray;
-		} else {
-			this.returnArray = new int[numberOfOutputChannels];
-			for (int i = 0; i < numberOfOutputChannels; i++) {
-				returnArray[i] = i;
-			}
-			set = true;
-			setNumber = numberOfOutputChannels;
-			return returnArray;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
deleted file mode 100644
index 7299397..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-/**
- * Partitioner that distributes the data equally by cycling through the output
- * channels.
- * 
- * @param <T>
- *            Type of the Tuple
- */
-public class DistributePartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[] {-1};
-
-	public DistributePartitioner(boolean forward) {
-		super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
-
-		return this.returnArray;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
deleted file mode 100644
index 6bc036e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-/**
- * Partitioner that selects the same (one) channel for two Tuples having a
- * specified fields equal.
- * 
- * @param <T>
- *            Type of the Tuple
- */
-public class FieldsPartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[1];;
-	KeySelector<T, ?> keySelector;
-
-	public FieldsPartitioner(KeySelector<T, ?> keySelector) {
-		super(PartitioningStrategy.GROUPBY);
-		this.keySelector = keySelector;
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		returnArray[0] = Math.abs(record.getInstance().getKey(keySelector).hashCode()
-				% numberOfOutputChannels);
-
-		return returnArray;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
deleted file mode 100644
index c73ca71..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-//Group to the partitioner with the lowest id
-public class GlobalPartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[] { 0 };
-
-	public GlobalPartitioner() {
-		super(PartitioningStrategy.GLOBAL);
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		return returnArray;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
deleted file mode 100644
index 318de3f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import java.util.Random;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-/**
- * Partitioner that distributes the data equally by selecting one output channel
- * randomly.
- * 
- * @param <T>
- *            Type of the Tuple
- */
-public class ShufflePartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private Random random = new Random();
-
-	private int[] returnArray = new int[1];
-
-	public ShufflePartitioner() {
-		super(PartitioningStrategy.SHUFFLE);
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		returnArray[0] = random.nextInt(numberOfOutputChannels);
-		return returnArray;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
deleted file mode 100644
index 19a8dba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.partitioner;
-
-import java.io.Serializable;
-
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public abstract class StreamPartitioner<T> implements
-		ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
-
-	public enum PartitioningStrategy {
-
-		FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY;
-
-	}
-
-	private static final long serialVersionUID = 1L;
-	private PartitioningStrategy strategy;
-
-	public StreamPartitioner(PartitioningStrategy strategy) {
-		this.strategy = strategy;
-	}
-
-	public PartitioningStrategy getStrategy() {
-		return strategy;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
new file mode 100644
index 0000000..bc153f9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class encapsulating the functionality that is necessary to sync inputs on
+ * superstep barriers. Once a barrier is received from an input channel, whe
+ * should not process further buffers from that channel until we received the
+ * barrier from all other channels as well. To avoid back-pressuring the
+ * readers, we buffer up the new data received from the blocked channels until
+ * the blocks are released.
+ * 
+ */
+public class BarrierBuffer {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+
+	private Queue<SpillingBufferOrEvent> nonprocessed = new LinkedList<SpillingBufferOrEvent>();
+	private Queue<SpillingBufferOrEvent> blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
+
+	private Set<Integer> blockedChannels = new HashSet<Integer>();
+	private int totalNumberOfInputChannels;
+
+	private StreamingSuperstep currentSuperstep;
+	private boolean superstepStarted;
+
+	private AbstractReader reader;
+
+	private InputGate inputGate;
+
+	private SpillReader spillReader;
+	private BufferSpiller bufferSpiller;
+
+	private boolean inputFinished = false;
+
+	private BufferOrEvent endOfStreamEvent = null;
+
+	public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
+		this.inputGate = inputGate;
+		totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+		this.reader = reader;
+		try {
+			this.bufferSpiller = new BufferSpiller();
+			this.spillReader = new SpillReader();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	/**
+	 * Starts the next superstep in the buffer
+	 * 
+	 * @param superstep
+	 *            The next superstep
+	 */
+	protected void startSuperstep(StreamingSuperstep superstep) {
+		this.currentSuperstep = superstep;
+		this.superstepStarted = true;
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Superstep started with id: " + superstep.getId());
+		}
+	}
+
+	/**
+	 * Get then next non-blocked non-processed BufferOrEvent. Returns null if
+	 * not available.
+	 * 
+	 * @throws IOException
+	 */
+	protected BufferOrEvent getNonProcessed() throws IOException {
+		SpillingBufferOrEvent nextNonprocessed;
+
+		while ((nextNonprocessed = nonprocessed.poll()) != null) {
+			BufferOrEvent boe = nextNonprocessed.getBufferOrEvent();
+			if (isBlocked(boe.getChannelIndex())) {
+				blockedNonprocessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
+			} else {
+				return boe;
+			}
+		}
+
+		return null;
+	}
+
+	/**
+	 * Checks whether a given channel index is blocked for this inputgate
+	 * 
+	 * @param channelIndex
+	 *            The channel index to check
+	 */
+	protected boolean isBlocked(int channelIndex) {
+		return blockedChannels.contains(channelIndex);
+	}
+
+	/**
+	 * Checks whether all channels are blocked meaning that barriers are
+	 * received from all channels
+	 */
+	protected boolean isAllBlocked() {
+		return blockedChannels.size() == totalNumberOfInputChannels;
+	}
+
+	/**
+	 * Returns the next non-blocked BufferOrEvent. This is a blocking operator.
+	 */
+	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+		// If there are non-processed buffers from the previously blocked ones,
+		// we get the next
+		BufferOrEvent bufferOrEvent = getNonProcessed();
+
+		if (bufferOrEvent != null) {
+			return bufferOrEvent;
+		} else if (blockedNonprocessed.isEmpty() && inputFinished) {
+			return endOfStreamEvent;
+		} else {
+			// If no non-processed, get new from input
+			while (true) {
+				if (!inputFinished) {
+					// We read the next buffer from the inputgate
+					bufferOrEvent = inputGate.getNextBufferOrEvent();
+
+					if (!bufferOrEvent.isBuffer()
+							&& bufferOrEvent.getEvent() instanceof EndOfPartitionEvent) {
+						if (inputGate.isFinished()) {
+							// store the event for later if the channel is
+							// closed
+							endOfStreamEvent = bufferOrEvent;
+							inputFinished = true;
+						}
+
+					} else {
+						if (isBlocked(bufferOrEvent.getChannelIndex())) {
+							// If channel blocked we just store it
+							blockedNonprocessed.add(new SpillingBufferOrEvent(bufferOrEvent,
+									bufferSpiller, spillReader));
+						} else {
+							return bufferOrEvent;
+						}
+					}
+				} else {
+					actOnAllBlocked();
+					return getNextNonBlocked();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Blocks the given channel index, from which a barrier has been received.
+	 * 
+	 * @param channelIndex
+	 *            The channel index to block.
+	 */
+	protected void blockChannel(int channelIndex) {
+		if (!blockedChannels.contains(channelIndex)) {
+			blockedChannels.add(channelIndex);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Channel blocked with index: " + channelIndex);
+			}
+			if (isAllBlocked()) {
+				actOnAllBlocked();
+			}
+
+		} else {
+			throw new RuntimeException("Tried to block an already blocked channel");
+		}
+	}
+
+	/**
+	 * Releases the blocks on all channels.
+	 * 
+	 * @throws IOException
+	 */
+	protected void releaseBlocks() {
+		if (!nonprocessed.isEmpty()) {
+			// sanity check
+			throw new RuntimeException("Error in barrier buffer logic");
+		}
+		nonprocessed = blockedNonprocessed;
+		blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
+
+		try {
+			spillReader.setSpillFile(bufferSpiller.getSpillFile());
+			bufferSpiller.resetSpillFile();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
+		blockedChannels.clear();
+		superstepStarted = false;
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("All barriers received, blocks released");
+		}
+	}
+
+	/**
+	 * Method that is executed once the barrier has been received from all
+	 * channels.
+	 */
+	protected void actOnAllBlocked() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Publishing barrier to the vertex");
+		}
+
+		if (currentSuperstep != null) {
+			reader.publish(currentSuperstep);
+		}
+
+		releaseBlocks();
+	}
+
+	/**
+	 * Processes a streaming superstep event
+	 * 
+	 * @param bufferOrEvent
+	 *            The BufferOrEvent containing the event
+	 */
+	public void processSuperstep(BufferOrEvent bufferOrEvent) {
+		StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
+		if (!superstepStarted) {
+			startSuperstep(superstep);
+		}
+		blockChannel(bufferOrEvent.getChannelIndex());
+	}
+
+	public void cleanup() throws IOException {
+		bufferSpiller.close();
+		File spillfile1 = bufferSpiller.getSpillFile();
+		if (spillfile1 != null) {
+			spillfile1.delete();
+		}
+
+		spillReader.close();
+		File spillfile2 = spillReader.getSpillFile();
+		if (spillfile2 != null) {
+			spillfile2.delete();
+		}
+	}
+
+	public String toString() {
+		return nonprocessed.toString() + blockedNonprocessed.toString();
+	}
+
+	public boolean isEmpty() {
+		return nonprocessed.isEmpty() && blockedNonprocessed.isEmpty();
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
new file mode 100644
index 0000000..3905558
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.flink.runtime.iterative.concurrent.Broker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+@SuppressWarnings("rawtypes")
+public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
+	/**
+	 * Singleton instance
+	 */
+	private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
+
+	private BlockingQueueBroker() {
+	}
+
+	/**
+	 * retrieve singleton instance
+	 */
+	public static Broker<BlockingQueue<StreamRecord>> instance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
new file mode 100644
index 0000000..0d57d05
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.StringUtils;
+
+public class BufferSpiller {
+
+	protected static Random rnd = new Random();
+
+	private File spillFile;
+	protected FileChannel spillingChannel;
+	private String tempDir;
+
+	public BufferSpiller() throws IOException {
+		String tempDirString = GlobalConfiguration.getString(
+				ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+				ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
+		String[] tempDirs = tempDirString.split(",|" + File.pathSeparator);
+
+		tempDir = tempDirs[rnd.nextInt(tempDirs.length)];
+
+		createSpillingChannel();
+	}
+
+	/**
+	 * Dumps the contents of the buffer to disk and recycles the buffer.
+	 */
+	public void spill(Buffer buffer) throws IOException {
+		try {
+			spillingChannel.write(buffer.getNioBuffer());
+			buffer.recycle();
+		} catch (IOException e) {
+			close();
+			throw new IOException(e);
+		}
+
+	}
+
+	@SuppressWarnings("resource")
+	private void createSpillingChannel() throws IOException {
+		this.spillFile = new File(tempDir, randomString(rnd) + ".buffer");
+		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
+	}
+
+	private static String randomString(Random random) {
+		final byte[] bytes = new byte[20];
+		random.nextBytes(bytes);
+		return StringUtils.byteToHexString(bytes);
+	}
+
+	public void close() throws IOException {
+		if (spillingChannel != null && spillingChannel.isOpen()) {
+			spillingChannel.close();
+		}
+	}
+
+	public void resetSpillFile() throws IOException {
+		close();
+		createSpillingChannel();
+	}
+
+	public File getSpillFile() {
+		return spillFile;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
new file mode 100644
index 0000000..4358810
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
+
+/**
+ * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
+ * input types.
+ */
+public class CoReaderIterator<T1, T2> {
+
+	private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
+																									// source
+
+	protected final ReusingDeserializationDelegate<T1> delegate1;
+	protected final ReusingDeserializationDelegate<T2> delegate2;
+
+	public CoReaderIterator(
+			CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
+			TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) {
+		this.reader = reader;
+		this.delegate1 = new ReusingDeserializationDelegate<T1>(serializer1);
+		this.delegate2 = new ReusingDeserializationDelegate<T2>(serializer2);
+	}
+
+	public int next(T1 target1, T2 target2) throws IOException {
+		this.delegate1.setInstance(target1);
+		this.delegate2.setInstance(target2);
+
+		try {
+			return this.reader.getNextRecord(this.delegate1, this.delegate2);
+
+		} catch (InterruptedException e) {
+			throw new IOException("Reader interrupted.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
new file mode 100644
index 0000000..cfd27f7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+
+/**
+ * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
+ * types to read records effectively.
+ */
+@SuppressWarnings("rawtypes")
+public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends
+		AbstractReader implements EventListener<InputGate>, StreamingReader {
+
+	private final InputGate bufferReader1;
+
+	private final InputGate bufferReader2;
+
+	private final LinkedBlockingDeque<Integer> availableRecordReaders = new LinkedBlockingDeque<Integer>();
+
+	private LinkedList<Integer> processed = new LinkedList<Integer>();
+
+	private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
+
+	private RecordDeserializer<T1> reader1currentRecordDeserializer;
+
+	private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers;
+
+	private RecordDeserializer<T2> reader2currentRecordDeserializer;
+
+	// 0 => none, 1 => reader (T1), 2 => reader (T2)
+	private int currentReaderIndex;
+
+	private boolean hasRequestedPartitions;
+
+	protected CoBarrierBuffer barrierBuffer1;
+	protected CoBarrierBuffer barrierBuffer2;
+
+	public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
+		super(new UnionInputGate(inputgate1, inputgate2));
+
+		this.bufferReader1 = inputgate1;
+		this.bufferReader2 = inputgate2;
+
+		this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate1
+				.getNumberOfInputChannels()];
+		this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate2
+				.getNumberOfInputChannels()];
+
+		for (int i = 0; i < reader1RecordDeserializers.length; i++) {
+			reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T1>();
+		}
+
+		for (int i = 0; i < reader2RecordDeserializers.length; i++) {
+			reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>();
+		}
+
+		inputgate1.registerListener(this);
+		inputgate2.registerListener(this);
+
+		barrierBuffer1 = new CoBarrierBuffer(inputgate1, this);
+		barrierBuffer2 = new CoBarrierBuffer(inputgate2, this);
+
+		barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2);
+		barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1);
+	}
+
+	public void requestPartitionsOnce() throws IOException, InterruptedException {
+		if (!hasRequestedPartitions) {
+			bufferReader1.requestPartitions();
+			bufferReader2.requestPartitions();
+
+			hasRequestedPartitions = true;
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
+
+		requestPartitionsOnce();
+
+		while (true) {
+			if (currentReaderIndex == 0) {
+				if ((bufferReader1.isFinished() && bufferReader2.isFinished())) {
+					return 0;
+				}
+
+				currentReaderIndex = getNextReaderIndexBlocking();
+
+			}
+
+			if (currentReaderIndex == 1) {
+				while (true) {
+					if (reader1currentRecordDeserializer != null) {
+						RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer
+								.getNextRecord(target1);
+
+						if (result.isBufferConsumed()) {
+							reader1currentRecordDeserializer.getCurrentBuffer().recycle();
+							reader1currentRecordDeserializer = null;
+
+							currentReaderIndex = 0;
+						}
+
+						if (result.isFullRecord()) {
+							return 1;
+						}
+					} else {
+
+						final BufferOrEvent boe = barrierBuffer1.getNextNonBlocked();
+
+						if (boe.isBuffer()) {
+							reader1currentRecordDeserializer = reader1RecordDeserializers[boe
+									.getChannelIndex()];
+							reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+						} else if (boe.getEvent() instanceof StreamingSuperstep) {
+							barrierBuffer1.processSuperstep(boe);
+							currentReaderIndex = 0;
+
+							break;
+						} else if (handleEvent(boe.getEvent())) {
+							currentReaderIndex = 0;
+
+							break;
+						}
+					}
+				}
+			} else if (currentReaderIndex == 2) {
+				while (true) {
+					if (reader2currentRecordDeserializer != null) {
+						RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer
+								.getNextRecord(target2);
+
+						if (result.isBufferConsumed()) {
+							reader2currentRecordDeserializer.getCurrentBuffer().recycle();
+							reader2currentRecordDeserializer = null;
+
+							currentReaderIndex = 0;
+						}
+
+						if (result.isFullRecord()) {
+							return 2;
+						}
+					} else {
+						final BufferOrEvent boe = barrierBuffer2.getNextNonBlocked();
+
+						if (boe.isBuffer()) {
+							reader2currentRecordDeserializer = reader2RecordDeserializers[boe
+									.getChannelIndex()];
+							reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+						} else if (boe.getEvent() instanceof StreamingSuperstep) {
+							barrierBuffer2.processSuperstep(boe);
+							currentReaderIndex = 0;
+
+							break;
+						} else if (handleEvent(boe.getEvent())) {
+							currentReaderIndex = 0;
+
+							break;
+						}
+					}
+				}
+			} else {
+				throw new IllegalStateException("Bug: unexpected current reader index.");
+			}
+		}
+	}
+
+	protected int getNextReaderIndexBlocking() throws InterruptedException {
+
+		Integer nextIndex = 0;
+
+		while (processed.contains(nextIndex = availableRecordReaders.take())) {
+			processed.remove(nextIndex);
+		}
+
+		if (nextIndex == 1) {
+			if (barrierBuffer1.isAllBlocked()) {
+				availableRecordReaders.addFirst(1);
+				processed.add(2);
+				return 2;
+			} else {
+				return 1;
+			}
+		} else {
+			if (barrierBuffer2.isAllBlocked()) {
+				availableRecordReaders.addFirst(2);
+				processed.add(1);
+				return 1;
+			} else {
+				return 2;
+			}
+
+		}
+
+	}
+
+	// ------------------------------------------------------------------------
+	// Data availability notifications
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void onEvent(InputGate bufferReader) {
+		addToAvailable(bufferReader);
+	}
+
+	protected void addToAvailable(InputGate bufferReader) {
+		if (bufferReader == bufferReader1) {
+			availableRecordReaders.add(1);
+		} else if (bufferReader == bufferReader2) {
+			availableRecordReaders.add(2);
+		}
+	}
+
+	public void clearBuffers() {
+		for (RecordDeserializer<?> deserializer : reader1RecordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+		for (RecordDeserializer<?> deserializer : reader2RecordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+	}
+
+	private class CoBarrierBuffer extends BarrierBuffer {
+
+		private CoBarrierBuffer otherBuffer;
+
+		public CoBarrierBuffer(InputGate inputGate, AbstractReader reader) {
+			super(inputGate, reader);
+		}
+
+		public void setOtherBarrierBuffer(CoBarrierBuffer other) {
+			this.otherBuffer = other;
+		}
+
+		@Override
+		protected void actOnAllBlocked() {
+			if (otherBuffer.isAllBlocked()) {
+				super.actOnAllBlocked();
+				otherBuffer.releaseBlocks();
+			}
+		}
+
+	}
+
+	public void cleanup() throws IOException {
+		try {
+			barrierBuffer1.cleanup();
+		} finally {
+			barrierBuffer2.cleanup();
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
new file mode 100644
index 0000000..7f2a9c5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+
+public class IndexedMutableReader<T extends IOReadableWritable> extends
+		StreamingMutableRecordReader<T> {
+
+	InputGate reader;
+
+	public IndexedMutableReader(InputGate reader) {
+		super(reader);
+		this.reader = reader;
+	}
+
+	public int getNumberOfInputChannels() {
+		return reader.getNumberOfInputChannels();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
new file mode 100644
index 0000000..2050e27
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.util.ReaderIterator;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+
+public class IndexedReaderIterator<T> extends ReaderIterator<T> {
+
+	public IndexedReaderIterator(
+			IndexedMutableReader<DeserializationDelegate<T>> reader,
+			TypeSerializer<T> serializer) {
+
+		super(reader, serializer);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
new file mode 100644
index 0000000..7883251
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.util.Collection;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+
+public class InputGateFactory {
+
+	public static InputGate createInputGate(Collection<InputGate> inputGates) {
+		return createInputGate(inputGates.toArray(new InputGate[inputGates.size()]));
+	}
+
+	public static InputGate createInputGate(InputGate[] inputGates) {
+		if (inputGates.length <= 0) {
+			throw new RuntimeException("No such input gate.");
+		}
+
+		if (inputGates.length < 2) {
+			return inputGates[0];
+		} else {
+			return new UnionInputGate(inputGates);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java
new file mode 100644
index 0000000..3a7ba3e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RecordWriterFactory {
+	private static final Logger LOG = LoggerFactory.getLogger(RecordWriterFactory.class);
+
+	public static <OUT extends IOReadableWritable> RecordWriter<OUT> createRecordWriter(ResultPartitionWriter bufferWriter, ChannelSelector<OUT> channelSelector, long bufferTimeout) {
+
+		RecordWriter<OUT> output;
+
+		if (bufferTimeout >= 0) {
+			output = new StreamRecordWriter<OUT>(bufferWriter, channelSelector, bufferTimeout);
+
+			if (LOG.isTraceEnabled()) {
+				LOG.trace("StreamRecordWriter initiated with {} bufferTimeout.", bufferTimeout);
+			}
+		} else {
+			output = new RecordWriter<OUT>(bufferWriter, channelSelector);
+
+			if (LOG.isTraceEnabled()) {
+				LOG.trace("RecordWriter initiated.");
+			}
+		}
+
+		return output;
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
new file mode 100644
index 0000000..356b491
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+public class SpillReader {
+
+	private FileChannel spillingChannel;
+	private File spillFile;
+
+	/**
+	 * Reads the next buffer from the spilled file.
+	 */
+	public Buffer readNextBuffer(int bufferSize) throws IOException {
+		try {
+			Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]),
+					new BufferRecycler() {
+
+						@Override
+						public void recycle(MemorySegment memorySegment) {
+							memorySegment.free();
+						}
+					});
+
+			spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize));
+
+			return buffer;
+		} catch (Exception e) {
+			close();
+			throw new IOException(e);
+		}
+	}
+
+	@SuppressWarnings("resource")
+	public void setSpillFile(File nextSpillFile) throws IOException {
+		// We can close and delete the file now
+		close();
+		if (spillFile != null) {
+			spillFile.delete();
+		}
+		this.spillFile = nextSpillFile;
+		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
+	}
+
+	public File getSpillFile() {
+		return spillFile;
+	}
+
+	public void close() throws IOException {
+		if (this.spillingChannel != null && this.spillingChannel.isOpen()) {
+			this.spillingChannel.close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
new file mode 100644
index 0000000..368e373
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEvent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+public class SpillingBufferOrEvent {
+
+	private BufferOrEvent boe;
+	private boolean isSpilled = false;
+
+	private SpillReader spillReader;
+
+	private int channelIndex;
+	private int bufferSize;
+
+	public SpillingBufferOrEvent(BufferOrEvent boe, BufferSpiller spiller, SpillReader reader)
+			throws IOException {
+
+		this.boe = boe;
+		this.channelIndex = boe.getChannelIndex();
+		this.spillReader = reader;
+
+		if (boe.isBuffer()) {
+			this.bufferSize = boe.getBuffer().getSize();
+			spiller.spill(boe.getBuffer());
+			this.boe = null;
+			this.isSpilled = true;
+		}
+	}
+
+	/**
+	 * If the buffer wasn't spilled simply returns the instance from the field,
+	 * otherwise reads it from the spill reader
+	 */
+	public BufferOrEvent getBufferOrEvent() throws IOException {
+		if (isSpilled) {
+			boe = new BufferOrEvent(spillReader.readNextBuffer(bufferSize), channelIndex);
+			this.isSpilled = false;
+			return boe;
+		} else {
+			return boe;
+		}
+	}
+
+	public boolean isSpilled() {
+		return isSpilled;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
new file mode 100644
index 0000000..eee30b6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
+
+import java.io.IOException;
+
+public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
+
+	private long timeout;
+
+	private OutputFlusher outputFlusher;
+
+	public StreamRecordWriter(ResultPartitionWriter writer) {
+		this(writer, new RoundRobinChannelSelector<T>(), 1000);
+	}
+
+	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
+		this(writer, channelSelector, 1000);
+	}
+
+	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
+			long timeout) {
+		super(writer, channelSelector);
+
+		this.timeout = timeout;
+		this.outputFlusher = new OutputFlusher();
+
+		outputFlusher.start();
+	}
+
+	public void close() {
+		try {
+			if (outputFlusher != null) {
+				outputFlusher.terminate();
+				outputFlusher.join();
+			}
+
+			flush();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		} catch (InterruptedException e) {
+			// Do nothing here
+		}
+	}
+
+	private class OutputFlusher extends Thread {
+		private volatile boolean running = true;
+
+		public void terminate() {
+			running = false;
+		}
+
+		@Override
+		public void run() {
+			while (running) {
+				try {
+					flush();
+					Thread.sleep(timeout);
+				} catch (InterruptedException e) {
+					// Do nothing here
+				} catch (IOException e) {
+					throw new RuntimeException(e);
+				}
+			}
+		}
+	}
+}


Mime
View raw message