flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [08/19] flink git commit: [streaming] Major internal renaming and restructure
Date Wed, 15 Apr 2015 09:38:49 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
new file mode 100644
index 0000000..491dc06
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A record-oriented reader.
+ * <p>
+ * This abstract base class is used by both the mutable and immutable record
+ * readers.
+ * 
+ * @param <T>
+ *            The type of the record that can be read with this record reader.
+ */
+public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends
+		AbstractReader implements ReaderBase, StreamingReader {
+
+	@SuppressWarnings("unused")
+	private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
+
+	private final RecordDeserializer<T>[] recordDeserializers;
+
+	private RecordDeserializer<T> currentRecordDeserializer;
+
+	private boolean isFinished;
+
+	private final BarrierBuffer barrierBuffer;
+
+	@SuppressWarnings("unchecked")
+	protected StreamingAbstractRecordReader(InputGate inputGate) {
+		super(inputGate);
+		barrierBuffer = new BarrierBuffer(inputGate, this);
+
+		// Initialize one deserializer per input channel
+		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
+				.getNumberOfInputChannels()];
+		for (int i = 0; i < recordDeserializers.length; i++) {
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
+		}
+	}
+
+	protected boolean getNextRecord(T target) throws IOException, InterruptedException {
+		if (isFinished) {
+			return false;
+		}
+
+		while (true) {
+			if (currentRecordDeserializer != null) {
+				DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
+
+				if (result.isBufferConsumed()) {
+					currentRecordDeserializer.getCurrentBuffer().recycle();
+					currentRecordDeserializer = null;
+				}
+
+				if (result.isFullRecord()) {
+					return true;
+				}
+			}
+
+			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+
+			if (bufferOrEvent.isBuffer()) {
+				currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
+				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+			} else {
+				// Event received
+				final AbstractEvent event = bufferOrEvent.getEvent();
+
+				if (event instanceof StreamingSuperstep) {
+					barrierBuffer.processSuperstep(bufferOrEvent);
+				} else {
+					if (handleEvent(event)) {
+						if (inputGate.isFinished()) {
+							if (!barrierBuffer.isEmpty()) {
+								throw new RuntimeException(
+										"BarrierBuffer should be empty at this point");
+							}
+							isFinished = true;
+							return false;
+						} else if (hasReachedEndOfSuperstep()) {
+							return false;
+						} // else: More data is coming...
+					}
+				}
+			}
+		}
+	}
+
+	public void clearBuffers() {
+		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+	}
+
+	public void cleanup() throws IOException {
+		barrierBuffer.cleanup();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
new file mode 100644
index 0000000..ad74004
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.reader.MutableReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+
+public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
+		StreamingAbstractRecordReader<T> implements MutableReader<T> {
+
+	public StreamingMutableRecordReader(InputGate inputGate) {
+		super(inputGate);
+	}
+
+	@Override
+	public boolean next(final T target) throws IOException, InterruptedException {
+		return getNextRecord(target);
+	}
+
+	@Override
+	public void clearBuffers() {
+		super.clearBuffers();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
new file mode 100644
index 0000000..9eb9337
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+public interface StreamingReader {
+
+	public void cleanup() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
new file mode 100644
index 0000000..f51a04f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that selects all the output channels.
+ *
+ * @param <T>
+ *            Type of the Tuple
+ */
+public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
+	private static final long serialVersionUID = 1L;
+
+	int[] returnArray;
+	boolean set;
+	int setNumber;
+
+	public BroadcastPartitioner() {
+		super(PartitioningStrategy.BROADCAST);
+	}
+
+	@Override
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+			int numberOfOutputChannels) {
+		if (set && setNumber == numberOfOutputChannels) {
+			return returnArray;
+		} else {
+			this.returnArray = new int[numberOfOutputChannels];
+			for (int i = 0; i < numberOfOutputChannels; i++) {
+				returnArray[i] = i;
+			}
+			set = true;
+			setNumber = numberOfOutputChannels;
+			return returnArray;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
new file mode 100644
index 0000000..3110da9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitioner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that distributes the data equally by cycling through the output
+ * channels.
+ * 
+ * @param <T>
+ *            Type of the Tuple
+ */
+public class DistributePartitioner<T> extends StreamPartitioner<T> {
+	private static final long serialVersionUID = 1L;
+
+	private int[] returnArray = new int[] {-1};
+
+	public DistributePartitioner(boolean forward) {
+		super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
+	}
+
+	@Override
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+			int numberOfOutputChannels) {
+		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
+
+		return this.returnArray;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
new file mode 100644
index 0000000..f44bd12
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that selects the same (one) channel for two Tuples having a
+ * specified fields equal.
+ * 
+ * @param <T>
+ *            Type of the Tuple
+ */
+public class FieldsPartitioner<T> extends StreamPartitioner<T> {
+	private static final long serialVersionUID = 1L;
+
+	private int[] returnArray = new int[1];;
+	KeySelector<T, ?> keySelector;
+
+	public FieldsPartitioner(KeySelector<T, ?> keySelector) {
+		super(PartitioningStrategy.GROUPBY);
+		this.keySelector = keySelector;
+	}
+
+	@Override
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+			int numberOfOutputChannels) {
+		returnArray[0] = Math.abs(record.getInstance().getKey(keySelector).hashCode()
+				% numberOfOutputChannels);
+
+		return returnArray;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
new file mode 100644
index 0000000..46b290b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+//Group to the partitioner with the lowest id
+public class GlobalPartitioner<T> extends StreamPartitioner<T> {
+	private static final long serialVersionUID = 1L;
+
+	private int[] returnArray = new int[] { 0 };
+
+	public GlobalPartitioner() {
+		super(PartitioningStrategy.GLOBAL);
+	}
+
+	@Override
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+			int numberOfOutputChannels) {
+		return returnArray;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
new file mode 100644
index 0000000..ba50113
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import java.util.Random;
+
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that distributes the data equally by selecting one output channel
+ * randomly.
+ * 
+ * @param <T>
+ *            Type of the Tuple
+ */
+public class ShufflePartitioner<T> extends StreamPartitioner<T> {
+	private static final long serialVersionUID = 1L;
+
+	private Random random = new Random();
+
+	private int[] returnArray = new int[1];
+
+	public ShufflePartitioner() {
+		super(PartitioningStrategy.SHUFFLE);
+	}
+
+	@Override
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+			int numberOfOutputChannels) {
+		returnArray[0] = random.nextInt(numberOfOutputChannels);
+		return returnArray;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
new file mode 100644
index 0000000..2699c3f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.partitioner;
+
+import java.io.Serializable;
+
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public abstract class StreamPartitioner<T> implements
+		ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
+
+	public enum PartitioningStrategy {
+
+		FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY;
+
+	}
+
+	private static final long serialVersionUID = 1L;
+	private PartitioningStrategy strategy;
+
+	public StreamPartitioner(PartitioningStrategy strategy) {
+		this.strategy = strategy;
+	}
+
+	public PartitioningStrategy getStrategy() {
+		return strategy;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
new file mode 100644
index 0000000..66a64b3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Object for wrapping a tuple or other object with ID used for sending records
+ * between streaming task in Apache Flink stream processing.
+ */
+public class StreamRecord<T> implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private T streamObject;
+	public boolean isTuple;
+
+	/**
+	 * Creates an empty StreamRecord
+	 */
+	public StreamRecord() {
+	}
+
+	/**
+	 * Gets the wrapped object from the StreamRecord
+	 * 
+	 * @return The object wrapped
+	 */
+	public T getObject() {
+		return streamObject;
+	}
+
+	/**
+	 * Gets the field of the contained object at the given position. If a tuple
+	 * is wrapped then the getField method is invoked. If the StreamRecord
+	 * contains and object of Basic types only position 0 could be returned.
+	 * 
+	 * @param pos
+	 *            Position of the field to get.
+	 * @return Returns the object contained in the position.
+	 */
+	public Object getField(int pos) {
+		if (isTuple) {
+			return ((Tuple) streamObject).getField(pos);
+		} else {
+			if (pos == 0) {
+				return streamObject;
+			} else {
+				throw new IndexOutOfBoundsException();
+			}
+		}
+	}
+
+	/**
+	 * Extracts key for the stored object using the keySelector provided.
+	 * 
+	 * @param keySelector
+	 *            KeySelector for extracting the key
+	 * @return The extracted key
+	 */
+	public <R> R getKey(KeySelector<T, R> keySelector) {
+		try {
+			return keySelector.getKey(streamObject);
+		} catch (Exception e) {
+			throw new RuntimeException("Failed to extract key: " + e.getMessage());
+		}
+	}
+
+	/**
+	 * Sets the object stored
+	 * 
+	 * @param object
+	 *            Object to set
+	 * @return Returns the StreamRecord object
+	 */
+	public StreamRecord<T> setObject(T object) {
+		this.streamObject = object;
+		return this;
+	}
+
+	@Override
+	public String toString() {
+		return streamObject.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
new file mode 100644
index 0000000..4499499
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<T> typeSerializer;
+	private final boolean isTuple;
+
+	public StreamRecordSerializer(TypeInformation<T> typeInfo, ExecutionConfig executionConfig) {
+		this.typeSerializer = typeInfo.createSerializer(executionConfig);
+		this.isTuple = typeInfo.isTupleType();
+	}
+
+	public TypeSerializer<T> getObjectSerializer() {
+		return typeSerializer;
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public StreamRecordSerializer<T> duplicate() {
+		return this;
+	}
+
+	@Override
+	public StreamRecord<T> createInstance() {
+		try {
+			StreamRecord<T> t = new StreamRecord<T>();
+			t.isTuple = isTuple;
+			t.setObject(typeSerializer.createInstance());
+			return t;
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot instantiate StreamRecord.", e);
+		}
+	}
+	
+	@Override
+	public StreamRecord<T> copy(StreamRecord<T> from) {
+		StreamRecord<T> rec = new StreamRecord<T>();
+		rec.isTuple = from.isTuple;
+		rec.setObject(typeSerializer.copy(from.getObject()));
+		return rec;
+	}
+
+	@Override
+	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
+		reuse.isTuple = from.isTuple;
+		reuse.setObject(typeSerializer.copy(from.getObject(), reuse.getObject()));
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
+		typeSerializer.serialize(value.getObject(), target);
+	}
+	
+	@Override
+	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
+		StreamRecord<T> record = new StreamRecord<T>();
+		record.isTuple = this.isTuple;
+		record.setObject(typeSerializer.deserialize(source));
+		return record;
+	}
+
+	@Override
+	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
+		reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source));
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		// Needs to be implemented
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CoStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CoStreamTask.java
new file mode 100644
index 0000000..b059efc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CoStreamTask.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.streaming.runtime.io.CoRecordReader;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.io.InputGateFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class CoStreamTask<IN1, IN2, OUT> extends StreamTask<IN1, OUT> {
+
+	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
+	protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
+
+	MutableObjectIterator<StreamRecord<IN1>> inputIter1;
+	MutableObjectIterator<StreamRecord<IN2>> inputIter2;
+
+	CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
+	CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
+
+	private static int numTasks;
+
+	public CoStreamTask() {
+		numTasks = newTask();
+		instanceID = numTasks;
+	}
+
+	private void setDeserializers() {
+		inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+		inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
+	}
+
+	@Override
+	public void setInputsOutputs() {
+		outputHandler = new OutputHandler<OUT>(this);
+
+		setConfigInputs();
+
+		coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
+				inputDeserializer1, inputDeserializer2);
+	}
+
+	@Override
+	public void clearBuffers() throws IOException {
+		outputHandler.clearWriters();
+		coReader.clearBuffers();
+		coReader.cleanup();
+	}
+
+	protected void setConfigInputs() throws StreamTaskException {
+		setDeserializers();
+
+		int numberOfInputs = configuration.getNumberOfInputs();
+
+		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+
+		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
+
+		for (int i = 0; i < numberOfInputs; i++) {
+			int inputType = inEdges.get(i).getTypeNumber();
+			InputGate reader = getEnvironment().getInputGate(i);
+			switch (inputType) {
+				case 1:
+					inputList1.add(reader);
+					break;
+				case 2:
+					inputList2.add(reader);
+					break;
+				default:
+					throw new RuntimeException("Invalid input type number: " + inputType);
+			}
+		}
+
+		final InputGate reader1 = InputGateFactory.createInputGate(inputList1);
+		final InputGate reader2 = InputGateFactory.createInputGate(inputList2);
+
+		coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
+				reader1, reader2);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> MutableObjectIterator<X> getInput(int index) {
+		switch (index) {
+			case 0:
+				return (MutableObjectIterator<X>) inputIter1;
+			case 1:
+				return (MutableObjectIterator<X>) inputIter2;
+			default:
+				throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
+		}
+	}
+
+	@Override
+	public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
+		throw new UnsupportedOperationException("Currently unsupported for connected streams");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+		switch (index) {
+			case 0:
+				return (StreamRecordSerializer<X>) inputDeserializer1;
+			case 1:
+				return (StreamRecordSerializer<X>) inputDeserializer2;
+			default:
+				throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+		return (CoReaderIterator<X, Y>) coIter;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/InputHandler.java
new file mode 100644
index 0000000..8648b8c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/InputHandler.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.network.api.reader.MutableReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.io.IndexedMutableReader;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.io.InputGateFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+
+public class InputHandler<IN> {
+	private StreamRecordSerializer<IN> inputSerializer = null;
+	private IndexedReaderIterator<StreamRecord<IN>> inputIter;
+	private IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>> inputs;
+
+	private StreamTask<IN, ?> streamVertex;
+	private StreamConfig configuration;
+
+	public InputHandler(StreamTask<IN, ?> streamComponent) {
+		this.streamVertex = streamComponent;
+		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
+		try {
+			setConfigInputs();
+		} catch (Exception e) {
+			throw new StreamTaskException("Cannot register inputs for "
+					+ getClass().getSimpleName(), e);
+		}
+
+	}
+
+	protected void setConfigInputs() throws StreamTaskException {
+		inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
+
+		int numberOfInputs = configuration.getNumberOfInputs();
+
+		if (numberOfInputs > 0) {
+			InputGate inputGate = InputGateFactory.createInputGate(streamVertex.getEnvironment().getAllInputGates());
+			inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate);
+
+			inputs.registerTaskEventListener(streamVertex.getSuperstepListener(),
+					StreamingSuperstep.class);
+
+			inputIter = new IndexedReaderIterator<StreamRecord<IN>>(inputs, inputSerializer);
+		}
+	}
+
+	protected static <T> IndexedReaderIterator<StreamRecord<T>> staticCreateInputIterator(
+			MutableReader<?> inputReader, TypeSerializer<StreamRecord<T>> serializer) {
+
+		// generic data type serialization
+		@SuppressWarnings("unchecked")
+		IndexedMutableReader<DeserializationDelegate<StreamRecord<T>>> reader = (IndexedMutableReader<DeserializationDelegate<StreamRecord<T>>>) inputReader;
+		final IndexedReaderIterator<StreamRecord<T>> iter = new IndexedReaderIterator<StreamRecord<T>>(
+				reader, serializer);
+		return iter;
+	}
+
+	public StreamRecordSerializer<IN> getInputSerializer() {
+		return inputSerializer;
+	}
+
+	public IndexedReaderIterator<StreamRecord<IN>> getInputIter() {
+		return inputIter;
+	}
+
+	public void clearReaders() throws IOException {
+		if (inputs != null) {
+			inputs.clearBuffers();
+			inputs.cleanup();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
new file mode 100644
index 0000000..c579c3a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.collector.CollectorWrapper;
+import org.apache.flink.streaming.api.collector.StreamOutput;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.ChainableStreamOperator;
+import org.apache.flink.streaming.runtime.io.RecordWriterFactory;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OutputHandler<OUT> {
+	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
+
+	private StreamTask<?, OUT> vertex;
+	private StreamConfig configuration;
+	private ClassLoader cl;
+	private Collector<OUT> outerCollector;
+
+	public List<ChainableStreamOperator<?, ?>> chainedOperators;
+
+	private Map<StreamEdge, StreamOutput<?>> outputMap;
+
+	private Map<Integer, StreamConfig> chainedConfigs;
+	private List<StreamEdge> outEdgesInOrder;
+
+	public OutputHandler(StreamTask<?, OUT> vertex) {
+
+		// Initialize some fields
+		this.vertex = vertex;
+		this.configuration = new StreamConfig(vertex.getTaskConfiguration());
+		this.chainedOperators = new ArrayList<ChainableStreamOperator<?, ?>>();
+		this.outputMap = new HashMap<StreamEdge, StreamOutput<?>>();
+		this.cl = vertex.getUserCodeClassLoader();
+
+		// We read the chained configs, and the order of record writer
+		// registrations by outputname
+		this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl);
+		this.chainedConfigs.put(configuration.getVertexID(), configuration);
+
+		this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
+
+		// We iterate through all the out edges from this job vertex and create
+		// a stream output
+		for (StreamEdge outEdge : outEdgesInOrder) {
+			StreamOutput<?> streamOutput = createStreamOutput(
+					outEdge,
+					outEdge.getTargetID(),
+					chainedConfigs.get(outEdge.getSourceID()),
+					outEdgesInOrder.indexOf(outEdge));
+			outputMap.put(outEdge, streamOutput);
+		}
+
+		// We create the outer collector that will be passed to the first task
+		// in the chain
+		this.outerCollector = createChainedCollector(configuration);
+	}
+
+	public void broadcastBarrier(long id) throws IOException, InterruptedException {
+		StreamingSuperstep barrier = new StreamingSuperstep(id);
+		for (StreamOutput<?> streamOutput : outputMap.values()) {
+			streamOutput.broadcastEvent(barrier);
+		}
+	}
+
+	public Collection<StreamOutput<?>> getOutputs() {
+		return outputMap.values();
+	}
+
+	/**
+	 * This method builds up a nested collector which encapsulates all the
+	 * chained operators and their network output. The result of this recursive
+	 * call will be passed as collector to the first operator in the chain.
+	 *
+	 * @param chainedTaskConfig
+	 * 		The configuration of the starting operator of the chain, we
+	 * 		use this paramater to recursively build the whole chain
+	 * @return Returns the collector for the chain starting from the given
+	 * config
+	 */
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) {
+
+
+		// We create a wrapper that will encapsulate the chained operators and
+		// network outputs
+
+		OutputSelectorWrapper<OUT> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(cl);
+		CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>(outputSelectorWrapper);
+
+		// Create collectors for the network outputs
+		for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
+			Collector<?> outCollector = outputMap.get(outputEdge);
+
+			wrapper.addCollector(outCollector, outputEdge);
+		}
+
+		// Create collectors for the chained outputs
+		for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
+			Integer output = outputEdge.getTargetID();
+
+			Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
+
+			wrapper.addCollector(outCollector, outputEdge);
+		}
+
+		if (chainedTaskConfig.isChainStart()) {
+			// The current task is the first chained task at this vertex so we
+			// return the wrapper
+			return wrapper;
+		} else {
+			// The current task is a part of the chain so we get the chainable
+			// operator which will be returned and set it up using the wrapper
+			ChainableStreamOperator chainableOperator = chainedTaskConfig.getStreamOperator(vertex
+					.getUserCodeClassLoader());
+			chainableOperator.setup(wrapper,
+					chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
+
+			chainedOperators.add(chainableOperator);
+			return chainableOperator;
+		}
+
+	}
+
+	public Collector<OUT> getCollector() {
+		return outerCollector;
+	}
+
+	/**
+	 * We create the StreamOutput for the specific output given by the id, and
+	 * the configuration of its source task
+	 *
+	 * @param outputVertex
+	 * 		Name of the output to which the streamoutput will be set up
+	 * @param upStreamConfig
+	 * 		The config of upStream task
+	 * @return The created StreamOutput
+	 */
+	private <T> StreamOutput<T> createStreamOutput(StreamEdge edge, Integer outputVertex,
+			StreamConfig upStreamConfig, int outputIndex) {
+
+		StreamRecordSerializer<T> outSerializer = upStreamConfig
+				.getTypeSerializerOut1(vertex.userClassLoader);
+		SerializationDelegate<StreamRecord<T>> outSerializationDelegate = null;
+
+		if (outSerializer != null) {
+			outSerializationDelegate = new SerializationDelegate<StreamRecord<T>>(outSerializer);
+			outSerializationDelegate.setInstance(outSerializer.createInstance());
+		}
+
+		@SuppressWarnings("unchecked")
+		StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
+
+		ResultPartitionWriter bufferWriter = vertex.getEnvironment().getWriter(outputIndex);
+
+		RecordWriter<SerializationDelegate<StreamRecord<T>>> output =
+				RecordWriterFactory.createRecordWriter(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
+
+		StreamOutput<T> streamOutput = new StreamOutput<T>(output, vertex.instanceID,
+				outSerializationDelegate);
+
+		if (LOG.isTraceEnabled()) {
+			LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
+					.getSimpleName(), outputIndex, vertex.getClass().getSimpleName());
+		}
+
+		return streamOutput;
+	}
+
+	public void flushOutputs() throws IOException, InterruptedException {
+		for (StreamOutput<?> streamOutput : getOutputs()) {
+			streamOutput.close();
+		}
+	}
+
+	public void clearWriters() {
+		for (StreamOutput<?> output : outputMap.values()) {
+			output.clearBuffers();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
new file mode 100644
index 0000000..8362d79
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.streaming.api.collector.StreamOutput;
+import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamIterationHead<OUT> extends StreamTask<OUT, OUT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
+
+	private Collection<StreamOutput<?>> outputs;
+
+	private static int numSources;
+	private Integer iterationId;
+	@SuppressWarnings("rawtypes")
+	private BlockingQueue<StreamRecord> dataChannel;
+	private long iterationWaitTime;
+	private boolean shouldWait;
+
+	@SuppressWarnings("rawtypes")
+	public StreamIterationHead() {
+		numSources = newTask();
+		instanceID = numSources;
+		dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
+	}
+
+	@Override
+	public void setInputsOutputs() {
+		outputHandler = new OutputHandler<OUT>(this);
+		outputs = outputHandler.getOutputs();
+
+		iterationId = configuration.getIterationId();
+		iterationWaitTime = configuration.getIterationWaitTime();
+		shouldWait = iterationWaitTime > 0;
+
+		try {
+			BlockingQueueBroker.instance().handIn(iterationId.toString()+"-" 
+					+getEnvironment().getIndexInSubtaskGroup(), dataChannel);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void invoke() throws Exception {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Iteration source {} invoked with instance id {}", getName(), getInstanceID());
+		}
+
+		try {
+			StreamRecord<OUT> nextRecord;
+
+			while (true) {
+				if (shouldWait) {
+					nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
+				} else {
+					nextRecord = dataChannel.take();
+				}
+				if (nextRecord == null) {
+					break;
+				}
+				for (StreamOutput<?> output : outputs) {
+					((StreamOutput<OUT>) output).collect(nextRecord.getObject());
+				}
+			}
+
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Iteration source failed due to: {}", StringUtils.stringifyException(e));
+			}
+			throw e;
+		} finally {
+			// Cleanup
+			outputHandler.flushOutputs();
+			clearBuffers();
+		}
+
+	}
+
+	@Override
+	protected void setOperator() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
new file mode 100644
index 0000000..d3d62f3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamIterationTail<IN> extends StreamTask<IN, IN> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
+
+	private InputHandler<IN> inputHandler;
+
+	private Integer iterationId;
+	@SuppressWarnings("rawtypes")
+	private BlockingQueue<StreamRecord> dataChannel;
+	private long iterationWaitTime;
+	private boolean shouldWait;
+
+	public StreamIterationTail() {
+	}
+
+	@Override
+	public void setInputsOutputs() {
+		try {
+			inputHandler = new InputHandler<IN>(this);
+
+			iterationId = configuration.getIterationId();
+			iterationWaitTime = configuration.getIterationWaitTime();
+			shouldWait = iterationWaitTime > 0;
+			dataChannel = BlockingQueueBroker.instance().get(iterationId.toString()+"-"
+					+getEnvironment().getIndexInSubtaskGroup());
+		} catch (Exception e) {
+			throw new StreamTaskException(String.format(
+					"Cannot register inputs of StreamIterationSink %s", iterationId), e);
+		}
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Iteration sink {} invoked", getName());
+		}
+
+		try {
+			forwardRecords();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Iteration sink {} invoke finished", getName());
+			}
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Iteration sink failed due to: {}", StringUtils.stringifyException(e));
+			}
+			throw e;
+		} finally {
+			// Cleanup
+			clearBuffers();
+		}
+	}
+
+	protected void forwardRecords() throws Exception {
+		StreamRecord<IN> reuse = inputHandler.getInputSerializer().createInstance();
+		while ((reuse = inputHandler.getInputIter().next(reuse)) != null) {
+			if (!pushToQueue(reuse)) {
+				break;
+			}
+			reuse = inputHandler.getInputSerializer().createInstance();
+		}
+	}
+
+	private boolean pushToQueue(StreamRecord<IN> record) throws InterruptedException {
+		try {
+			if (shouldWait) {
+				return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
+			} else {
+				dataChannel.put(record);
+				return true;
+			}
+		} catch (InterruptedException e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
+						StringUtils.stringifyException(e));
+				throw e;
+			}
+			return false;
+		}
+	}
+
+	@Override
+	protected void setOperator() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
new file mode 100644
index 0000000..82486e8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
+import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
+import org.apache.flink.runtime.messages.CheckpointingMessages;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.ChainableStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import akka.actor.ActorRef;
+
+public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>,
+		BarrierTransceiver, OperatorStateCarrier {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
+
+	private static int numTasks;
+
+	protected StreamConfig configuration;
+	protected int instanceID;
+	private static int numVertices = 0;
+
+	private InputHandler<IN> inputHandler;
+	protected OutputHandler<OUT> outputHandler;
+	private StreamOperator<IN, OUT> streamOperator;
+	protected volatile boolean isRunning = false;
+
+	private StreamingRuntimeContext context;
+	private Map<String, OperatorState<?>> states;
+
+	protected ClassLoader userClassLoader;
+
+	private EventListener<TaskEvent> superstepListener;
+
+	public StreamTask() {
+		streamOperator = null;
+		numTasks = newTask();
+		instanceID = numTasks;
+		superstepListener = new SuperstepEventListener();
+	}
+
+	protected static int newTask() {
+		numVertices++;
+		return numVertices;
+	}
+
+	@Override
+	public void registerInputOutput() {
+		initialize();
+		setInputsOutputs();
+		setOperator();
+	}
+
+	protected void initialize() {
+		this.userClassLoader = getUserCodeClassLoader();
+		this.configuration = new StreamConfig(getTaskConfiguration());
+		this.states = new HashMap<String, OperatorState<?>>();
+		this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
+	}
+
+	@Override
+	public void broadcastBarrierFromSource(long id) {
+		// Only called at input vertices
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Received barrier from jobmanager: " + id);
+		}
+		actOnBarrier(id);
+	}
+
+	/**
+	 * This method is called to confirm that a barrier has been fully processed.
+	 * It sends an acknowledgment to the jobmanager. In the current version if
+	 * there is user state it also checkpoints the state to the jobmanager.
+	 */
+	@Override
+	public void confirmBarrier(long barrierID) throws IOException {
+
+		if (configuration.getStateMonitoring() && !states.isEmpty()) {
+			getEnvironment().getJobManager().tell(
+					new CheckpointingMessages.StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
+							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
+							new LocalStateHandle(states)), ActorRef.noSender());
+		} else {
+			getEnvironment().getJobManager().tell(
+					new CheckpointingMessages.BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
+							context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
+		}
+
+	}
+
+	public void setInputsOutputs() {
+		inputHandler = new InputHandler<IN>(this);
+		outputHandler = new OutputHandler<OUT>(this);
+	}
+
+	protected void setOperator() {
+		streamOperator = configuration.getStreamOperator(userClassLoader);
+		streamOperator.setup(this);
+	}
+
+	public String getName() {
+		return getEnvironment().getTaskName();
+	}
+
+	public int getInstanceID() {
+		return instanceID;
+	}
+
+	public StreamingRuntimeContext createRuntimeContext(String taskName,
+			Map<String, OperatorState<?>> states) {
+		Environment env = getEnvironment();
+		return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(),
+				getExecutionConfig(), states);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		this.isRunning = true;
+
+		boolean operatorOpen = false;
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Task {} invoked with instance id {}", getName(), getInstanceID());
+		}
+
+		try {
+			streamOperator.setRuntimeContext(context);
+
+			operatorOpen = true;
+			openOperator();
+
+			streamOperator.run();
+
+			closeOperator();
+			operatorOpen = false;
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Task {} invoke finished instance id {}", getName(), getInstanceID());
+			}
+
+		} catch (Exception e) {
+
+			if (operatorOpen) {
+				try {
+					closeOperator();
+				} catch (Throwable t) {
+				}
+			}
+
+			if (LOG.isErrorEnabled()) {
+				LOG.error("StreamOperator failed due to: {}", StringUtils.stringifyException(e));
+			}
+			throw e;
+		} finally {
+			this.isRunning = false;
+			// Cleanup
+			outputHandler.flushOutputs();
+			clearBuffers();
+		}
+
+	}
+
+	protected void openOperator() throws Exception {
+		streamOperator.open(getTaskConfiguration());
+
+		for (ChainableStreamOperator<?, ?> operator : outputHandler.chainedOperators) {
+			operator.setRuntimeContext(context);
+			operator.open(getTaskConfiguration());
+		}
+	}
+
+	protected void closeOperator() throws Exception {
+		streamOperator.close();
+
+		for (ChainableStreamOperator<?, ?> operator : outputHandler.chainedOperators) {
+			operator.close();
+		}
+	}
+
+	protected void clearBuffers() throws IOException {
+		if (outputHandler != null) {
+			outputHandler.clearWriters();
+		}
+		if (inputHandler != null) {
+			inputHandler.clearReaders();
+		}
+	}
+
+	@Override
+	public void cancel() {
+		if (streamOperator != null) {
+			streamOperator.cancel();
+		}
+	}
+
+	@Override
+	public StreamConfig getConfig() {
+		return configuration;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> MutableObjectIterator<X> getInput(int index) {
+		if (index == 0) {
+			return (MutableObjectIterator<X>) inputHandler.getInputIter();
+		} else {
+			throw new IllegalArgumentException("There is only 1 input");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
+		if (index == 0) {
+			return (IndexedReaderIterator<X>) inputHandler.getInputIter();
+		} else {
+			throw new IllegalArgumentException("There is only 1 input");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+		if (index == 0) {
+			return (StreamRecordSerializer<X>) inputHandler.getInputSerializer();
+		} else {
+			throw new IllegalArgumentException("There is only 1 input");
+		}
+	}
+
+	@Override
+	public Collector<OUT> getOutputCollector() {
+		return outputHandler.getCollector();
+	}
+
+	@Override
+	public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+		throw new IllegalArgumentException("CoReader not available");
+	}
+
+	public EventListener<TaskEvent> getSuperstepListener() {
+		return this.superstepListener;
+	}
+
+	/**
+	 * Method to be called when a barrier is received from all the input
+	 * channels. It should broadcast the barrier to the output operators,
+	 * checkpoint the state and send an ack.
+	 * 
+	 * @param id
+	 */
+	private synchronized void actOnBarrier(long id) {
+		if (isRunning) {
+			try {
+				outputHandler.broadcastBarrier(id);
+				confirmBarrier(id);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Superstep " + id + " processed: " + StreamTask.this);
+				}
+			} catch (Exception e) {
+				// Only throw any exception if the vertex is still running
+				if (isRunning) {
+					throw new RuntimeException(e);
+				}
+			}
+		}
+	}
+
+	@Override
+	public String toString() {
+		return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")";
+	}
+
+	/**
+	 * Re-injects the user states into the map
+	 */
+	@Override
+	public void injectState(StateHandle stateHandle) {
+		this.states.putAll(stateHandle.getState(userClassLoader));
+	}
+
+	private class SuperstepEventListener implements EventListener<TaskEvent> {
+
+		@Override
+		public void onEvent(TaskEvent event) {
+			actOnBarrier(((StreamingSuperstep) event).getId());
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskContext.java
new file mode 100644
index 0000000..ba447d6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public interface StreamTaskContext<OUT> {
+
+	StreamConfig getConfig();
+
+	ClassLoader getUserCodeClassLoader();
+
+	<X> MutableObjectIterator<X> getInput(int index);
+
+	<X> IndexedReaderIterator<X> getIndexedInput(int index);
+
+	<X> StreamRecordSerializer<X> getInputSerializer(int index);
+
+	Collector<OUT> getOutputCollector();
+
+	<X, Y> CoReaderIterator<X, Y> getCoReader();
+
+	ExecutionConfig getExecutionConfig();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
new file mode 100644
index 0000000..d93078b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+/**
+ * An exception that is thrown by the stream verices when encountering an
+ * illegal condition.
+ */
+public class StreamTaskException extends RuntimeException {
+
+	/**
+	 * Serial version UID for serialization interoperability.
+	 */
+	private static final long serialVersionUID = 8392043527067472439L;
+
+	/**
+	 * Creates a compiler exception with no message and no cause.
+	 */
+	public StreamTaskException() {
+	}
+
+	/**
+	 * Creates a compiler exception with the given message and no cause.
+	 * 
+	 * @param message
+	 *            The message for the exception.
+	 */
+	public StreamTaskException(String message) {
+		super(message);
+	}
+
+	/**
+	 * Creates a compiler exception with the given cause and no message.
+	 * 
+	 * @param cause
+	 *            The <tt>Throwable</tt> that caused this exception.
+	 */
+	public StreamTaskException(Throwable cause) {
+		super(cause);
+	}
+
+	/**
+	 * Creates a compiler exception with the given message and cause.
+	 * 
+	 * @param message
+	 *            The message for the exception.
+	 * @param cause
+	 *            The <tt>Throwable</tt> that caused this exception.
+	 */
+	public StreamTaskException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
new file mode 100644
index 0000000..eaf8a1d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.Map;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.state.OperatorState;
+
+/**
+ * Implementation of the {@link RuntimeContext}, created by runtime stream UDF
+ * operators.
+ */
+public class StreamingRuntimeContext extends RuntimeUDFContext {
+
+	private final Environment env;
+	private final Map<String, OperatorState<?>> operatorStates;
+
+	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
+			ExecutionConfig executionConfig, Map<String, OperatorState<?>> operatorStates) {
+		super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
+				executionConfig, env.getCopyTask());
+		this.env = env;
+		this.operatorStates = operatorStates;
+	}
+
+	/**
+	 * Returns the operator state registered by the given name for the operator.
+	 * 
+	 * @param name
+	 *            Name of the operator state to be returned.
+	 * @return The operator state.
+	 */
+	public OperatorState<?> getState(String name) {
+		if (operatorStates == null) {
+			throw new RuntimeException("No state has been registered for this operator.");
+		} else {
+			OperatorState<?> state = operatorStates.get(name);
+			if (state != null) {
+				return state;
+			} else {
+				throw new RuntimeException("No state has been registered for the name: " + name);
+			}
+		}
+	}
+
+	/**
+	 * Returns whether there is a state stored by the given name
+	 */
+	public boolean containsState(String name) {
+		return operatorStates.containsKey(name);
+	}
+
+	/**
+	 * This is a beta feature </br></br> Register an operator state for this
+	 * operator by the given name. This name can be used to retrieve the state
+	 * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
+	 * obtain the {@link StreamingRuntimeContext} from the user-defined function
+	 * use the {@link RichFunction#getRuntimeContext()} method.
+	 * 
+	 * @param name
+	 *            The name of the operator state.
+	 * @param state
+	 *            The state to be registered for this name.
+	 */
+	public void registerState(String name, OperatorState<?> state) {
+		if (state == null) {
+			throw new RuntimeException("Cannot register null state");
+		} else {
+			if (operatorStates.containsKey(name)) {
+				throw new RuntimeException("State is already registered");
+			} else {
+				operatorStates.put(name, state);
+			}
+		}
+	}
+
+	/**
+	 * Returns the input split provider associated with the operator.
+	 * 
+	 * @return The input split provider.
+	 */
+	public InputSplitProvider getInputSplitProvider() {
+		return env.getInputSplitProvider();
+	}
+
+	/**
+	 * Returns the stub parameters associated with the {@link TaskConfig} of the
+	 * operator.
+	 * 
+	 * @return The stub parameters.
+	 */
+	public Configuration getTaskStubParameters() {
+		return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
new file mode 100644
index 0000000..4b3419b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.TaskEvent;
+
+public class StreamingSuperstep extends TaskEvent {
+
+	protected long id;
+
+	public StreamingSuperstep() {
+
+	}
+
+	public StreamingSuperstep(long id) {
+		this.id = id;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeLong(id);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		id = in.readLong();
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public boolean equals(Object other) {
+		if (other == null || !(other instanceof StreamingSuperstep)) {
+			return false;
+		} else {
+			return ((StreamingSuperstep) other).id == this.id;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
deleted file mode 100644
index 02a628b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-/**
- * A simple class, that manages a circular queue with sliding interval. If the
- * queue if full and a new element is added, the elements that belong to the
- * first sliding interval are removed.
- */
-public class CircularFifoList<T> implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private Queue<T> queue;
-	private Queue<Long> slideSizes;
-	private long counter;
-	private Iterable<T> iterable;
-
-	public CircularFifoList() {
-		this.queue = new LinkedList<T>();
-		this.slideSizes = new LinkedList<Long>();
-		this.counter = 0;
-		this.iterable = new ListIterable();
-	}
-
-	public void add(T element) {
-		queue.add(element);
-		counter++;
-	}
-
-	public void newSlide() {
-		slideSizes.add(counter);
-		counter = 0;
-	}
-
-	public void shiftWindow() {
-		shiftWindow(1);
-	}
-
-	public void shiftWindow(int numberOfSlides) {
-
-		if (numberOfSlides <= slideSizes.size()) {
-			for (int i = 0; i < numberOfSlides; i++) {
-				Long firstSlideSize = slideSizes.remove();
-
-				for (int j = 0; j < firstSlideSize; j++) {
-					queue.remove();
-				}
-			}
-		} else {
-			slideSizes.clear();
-			queue.clear();
-			counter = 0;
-		}
-
-	}
-	
-	@SuppressWarnings("unchecked")
-	public List<T> getElements(){
-		return (List<T>) queue;
-	}
-
-	public Iterator<T> getIterator() {
-		return queue.iterator();
-	}
-
-	public Iterable<T> getIterable() {
-		return iterable;
-	}
-
-	private class ListIterable implements Iterable<T>, Serializable {
-		
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Iterator<T> iterator() {
-			return getIterator();
-		}
-
-	}
-	
-	public boolean isEmpty() {
-		return queue.isEmpty();
-	}
-
-	@Override
-	public String toString() {
-		return queue.toString();
-	}
-
-	
-}


Mime
View raw message