flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [03/14] flink git commit: [FLINK-1638] [streaming] Vertex level fault tolerance and state monitor
Date Tue, 10 Mar 2015 14:00:03 GMT
[FLINK-1638] [streaming] Vertex level fault tolerance and state monitor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5061edb8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5061edb8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5061edb8

Branch: refs/heads/master
Commit: 5061edb80df092a2f8719054b0d2bce8c670265c
Parents: b4e8350
Author: Gyula Fora <gyfora@apache.org>
Authored: Fri Feb 20 22:24:27 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Mar 10 14:58:48 2015 +0100

----------------------------------------------------------------------
 .../runtime/event/task/StreamingSuperstep.java  |  51 +++++++
 .../io/network/api/reader/AbstractReader.java   |   4 +
 .../api/reader/AbstractRecordReader.java        |  58 +++++---
 .../io/network/api/reader/BarrierBuffer.java    | 143 +++++++++++++++++++
 .../jobgraph/tasks/BarrierTransceiver.java      |  27 ++++
 .../runtime/jobmanager/StreamStateMonitor.scala |  96 +++++++++++++
 .../flink/streaming/api/StreamConfig.java       |   9 ++
 .../api/StreamingJobGraphGenerator.java         |   4 +-
 .../streaming/api/collector/StreamOutput.java   |   4 +
 .../datastream/SingleOutputStreamOperator.java  |   4 +-
 .../api/invokable/StreamInvokable.java          |   2 +-
 .../api/streamvertex/InputHandler.java          |  24 ++--
 .../api/streamvertex/OutputHandler.java         |   8 ++
 .../api/streamvertex/StreamVertex.java          |  61 +++++++-
 .../flink/streaming/io/CoRecordReader.java      |   8 +-
 .../streaming/io/IndexedMutableReader.java      |   2 +-
 .../flink/streaming/util/MockContext.java       |   2 +-
 17 files changed, 464 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
new file mode 100644
index 0000000..e35eb28
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event.task;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class StreamingSuperstep extends TaskEvent {
+
+	protected long id;
+
+	public StreamingSuperstep() {
+
+	}
+
+	public StreamingSuperstep(long id) {
+		this.id = id;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeLong(id);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		id = in.readLong();
+	}
+
+	public long getId() {
+		return id;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
index 1bfca84..96b6f99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
@@ -110,6 +110,10 @@ public abstract class AbstractReader implements ReaderBase {
 			throw new IOException("Error while handling event of type " + eventType + ": " + t.getMessage(),
t);
 		}
 	}
+	
+	public void publish(TaskEvent event){
+		taskEventHandler.publish(event);
+	}
 
 	// ------------------------------------------------------------------------
 	// Iterations

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index e70b6ee..cc36438 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -18,24 +18,37 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-
-import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A record-oriented reader.
  * <p>
- * This abstract base class is used by both the mutable and immutable record readers.
- *
- * @param <T> The type of the record that can be read with this record reader.
+ * This abstract base class is used by both the mutable and immutable record
+ * readers.
+ * 
+ * @param <T>
+ *            The type of the record that can be read with this record reader.
  */
-abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader
implements ReaderBase {
+abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader
implements
+		ReaderBase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordReader.class);
 
 	private final RecordDeserializer<T>[] recordDeserializers;
 
@@ -43,11 +56,15 @@ abstract class AbstractRecordReader<T extends IOReadableWritable>
extends Abstra
 
 	private boolean isFinished;
 
+	private final BarrierBuffer barrierBuffer;
+
 	protected AbstractRecordReader(InputGate inputGate) {
 		super(inputGate);
+		barrierBuffer = new BarrierBuffer(inputGate, this);
 
 		// Initialize one deserializer per input channel
-		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
+				.getNumberOfInputChannels()];
 		for (int i = 0; i < recordDeserializers.length; i++) {
 			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
 		}
@@ -72,22 +89,27 @@ abstract class AbstractRecordReader<T extends IOReadableWritable>
extends Abstra
 				}
 			}
 
-			final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
+			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
 
 			if (bufferOrEvent.isBuffer()) {
 				currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
 				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-			}
-			else if (handleEvent(bufferOrEvent.getEvent())) {
-				if (inputGate.isFinished()) {
-					isFinished = true;
-
-					return false;
+			} else {
+				// Event received
+				final AbstractEvent event = bufferOrEvent.getEvent();
+
+				if (event instanceof StreamingSuperstep) {
+					barrierBuffer.processSuperstep(bufferOrEvent);
+				} else {
+					if (handleEvent(event)) {
+						if (inputGate.isFinished()) {
+							isFinished = true;
+							return false;
+						} else if (hasReachedEndOfSuperstep()) {
+							return false;
+						} // else: More data is coming...
+					}
 				}
-				else if (hasReachedEndOfSuperstep()) {
-
-					return false;
-				} // else: More data is coming...
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
new file mode 100644
index 0000000..ee317cd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BarrierBuffer {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+
+	private Queue<BufferOrEvent> bufferOrEvents = new LinkedList<BufferOrEvent>();
+	private Queue<BufferOrEvent> unprocessed = new LinkedList<BufferOrEvent>();
+
+	private Set<Integer> blockedChannels = new HashSet<Integer>();
+	private int totalNumberOfInputChannels;
+
+	private StreamingSuperstep currentSuperstep;
+	private boolean receivedSuperstep;
+
+	private boolean blockAll = false;
+
+	private AbstractReader reader;
+
+	private InputGate inputGate;
+
+	public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
+		this.inputGate = inputGate;
+		totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+		this.reader = reader;
+	}
+
+	private void startSuperstep(StreamingSuperstep superstep) {
+		this.currentSuperstep = superstep;
+		this.receivedSuperstep = true;
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Superstep started with id: " + superstep.getId());
+		}
+	}
+
+	private void store(BufferOrEvent bufferOrEvent) {
+		bufferOrEvents.add(bufferOrEvent);
+	}
+
+	private BufferOrEvent getNonProcessed() {
+		return unprocessed.poll();
+	}
+
+	private boolean isBlocked(int channelIndex) {
+		return blockAll || blockedChannels.contains(channelIndex);
+	}
+	
+	private boolean containsNonprocessed() {
+		return !unprocessed.isEmpty();
+	}
+
+	private boolean receivedSuperstep() {
+		return receivedSuperstep;
+	}
+
+	public BufferOrEvent getNextNonBlocked() throws IOException,
+			InterruptedException {
+		BufferOrEvent bufferOrEvent = null;
+
+		if (containsNonprocessed()) {
+			bufferOrEvent = getNonProcessed();
+		} else {
+			while (bufferOrEvent == null) {
+				BufferOrEvent nextBufferOrEvent = inputGate.getNextBufferOrEvent();
+				if (isBlocked(nextBufferOrEvent.getChannelIndex())) {
+					store(nextBufferOrEvent);
+				} else {
+					bufferOrEvent = nextBufferOrEvent;
+				}
+			}
+		}
+		return bufferOrEvent;
+	}
+
+	private void blockChannel(int channelIndex) {
+		if (!blockedChannels.contains(channelIndex)) {
+			blockedChannels.add(channelIndex);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Channel blocked with index: " + channelIndex);
+			}
+			if (blockedChannels.size() == totalNumberOfInputChannels) {
+				reader.publish(currentSuperstep);
+				unprocessed.addAll(bufferOrEvents);
+				bufferOrEvents.clear();
+				blockedChannels.clear();
+				receivedSuperstep = false;
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("All barriers received, blocks released");
+				}
+			}
+
+		} else {
+			throw new RuntimeException("Tried to block an already blocked channel");
+		}
+	}
+
+	public String toString() {
+		return blockedChannels.toString();
+	}
+
+	public void processSuperstep(BufferOrEvent bufferOrEvent) {
+		int channelIndex = bufferOrEvent.getChannelIndex();
+		if (isBlocked(channelIndex)) {
+			store(bufferOrEvent);
+		} else {
+			StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
+			if (!receivedSuperstep()) {
+				startSuperstep(superstep);
+			}
+			blockChannel(channelIndex);
+		}
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
new file mode 100644
index 0000000..c56da62
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+
+public interface BarrierTransceiver {
+
+	public void broadcastBarrier(long barrierID);
+	
+	public void confirmBarrier(long barrierID);
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
new file mode 100644
index 0000000..a37ddb5
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import akka.actor._
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
+import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
+
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.immutable.TreeMap
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.{FiniteDuration, _}
+
+
+object StreamStateMonitor {
+
+  def props(context: ActorContext, executionGraph: ExecutionGraph,
+            interval: FiniteDuration = 5 seconds): ActorRef = {
+
+    val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
+    val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph,
+      vertices, vertices.map(x => ((x.getJobVertex.getJobVertexId, x.getParallelSubtaskIndex),
List.empty[Long])).toMap, interval, 0L, -1L)))
+    monitor ! InitBarrierScheduler
+    monitor
+  }
+
+  private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex]
= {
+    for ((_, execJobVertex) <- executionGraph.getAllVertices;
+         execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
+    yield execVertex
+  }
+}
+
+class StreamStateMonitor(val executionGraph: ExecutionGraph,
+                         val vertices: Iterable[ExecutionVertex], var acks: Map[(JobVertexID,
Int), List[Long]],
+                         val interval: FiniteDuration, var curId: Long, var ackId: Long)
+        extends Actor with ActorLogMessages with ActorLogging {
+
+  override def receiveWithLogMessages: Receive = {
+    case InitBarrierScheduler =>
+      context.system.scheduler.schedule(interval, interval, self, BarrierTimeout)
+      context.system.scheduler.schedule(2 * interval, 2 * interval, self, UpdateCurrentBarrier)
+      log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
+        executionGraph.getJobID, executionGraph.getJobName)
+    case BarrierTimeout =>
+      curId += 1
+      log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName)
+      vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex
+      => vertex.getCurrentAssignedResource.getInstance.getTaskManager
+                ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId, curId))
+    case BarrierAck(_, jobVertexID, instanceID, checkpointID) =>
+      acks.get(jobVertexID, instanceID) match {
+        case Some(acklist) =>
+          acks += (jobVertexID, instanceID) -> (checkpointID :: acklist)
+        case None =>
+      }
+      log.info(acks.toString)
+    case UpdateCurrentBarrier =>
+      val barrierCount = acks.values.foldLeft(TreeMap[Long, Int]().withDefaultValue(0))((dict,
myList)
+      => myList.foldLeft(dict)((dict2, elem) => dict2.updated(elem, dict2(elem) + 1)))
+      val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
+      ackId = if (!keysToKeep.isEmpty) keysToKeep.max else ackId
+      acks.keys.foreach(x => acks = acks.updated(x, acks(x).filter(_ >= ackId)))
+      log.debug("[FT-MONITOR] Last global barrier is " + ackId)
+  }
+}
+
+case class BarrierTimeout()
+
+case class InitBarrierScheduler()
+
+case class UpdateCurrentBarrier()
+
+case class BarrierReq(attemptID: ExecutionAttemptID, checkpointID: Long)
+
+case class BarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Int, checkpointID:
Long)
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index e1362c4..d464ef1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -48,6 +48,7 @@ public class StreamConfig implements Serializable {
 	private static final String OUTPUT_NAME = "outputName_";
 	private static final String PARTITIONER_OBJECT = "partitionerObject_";
 	private static final String VERTEX_NAME = "vertexID";
+	private static final String OPERATOR_NAME = "operatorName";
 	private static final String ITERATION_ID = "iteration-id";
 	private static final String OUTPUT_SELECTOR = "outputSelector";
 	private static final String DIRECTED_EMIT = "directedEmit";
@@ -87,6 +88,14 @@ public class StreamConfig implements Serializable {
 		return config.getInteger(VERTEX_NAME, -1);
 	}
 
+	public void setOperatorName(String name) {
+		config.setString(OPERATOR_NAME, name);
+	}
+
+	public String getOperatorName() {
+		return config.getString(OPERATOR_NAME, "Missing");
+	}
+
 	public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index b999c27..c9698e3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -188,7 +188,9 @@ public class StreamingJobGraphGenerator {
 		builtVertices.add(vertexID);
 		jobGraph.addVertex(vertex);
 
-		return new StreamConfig(vertex.getConfiguration());
+		StreamConfig retConfig = new StreamConfig(vertex.getConfiguration());
+		retConfig.setOperatorName(chainedNames.get(vertexID));
+		return retConfig;
 	}
 
 	private void setVertexConfig(Integer vertexID, StreamConfig config,

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
index a497119..c3f694e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.collector;
 
 import java.io.IOException;
 
+import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -87,4 +88,7 @@ public class StreamOutput<OUT> implements Collector<OUT> {
 		output.clearBuffers();
 	}
 
+	public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException {
+		output.broadcastEvent(barrier);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index dcfd6fe..cdf43ee 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -112,7 +112,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *            The state to be registered for this name.
 	 * @return The data stream with state registered.
 	 */
-	protected SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?>
state) {
+	public SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?>
state) {
 		streamGraph.addOperatorState(getId(), name, state);
 		return this;
 	}
@@ -128,7 +128,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *            The map containing the states that will be registered.
 	 * @return The data stream with states registered.
 	 */
-	protected SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>>
states) {
+	public SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>>
states) {
 		for (Entry<String, OperatorState<?>> entry : states.entrySet()) {
 			streamGraph.addOperatorState(getId(), entry.getKey(), entry.getValue());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index abe31d4..6281de3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -96,7 +96,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable
{
 	 * Reads the next record from the reader iterator and stores it in the
 	 * nextRecord variable
 	 */
-	protected StreamRecord<IN> readNext() {
+	protected StreamRecord<IN> readNext() throws IOException {
 		this.nextRecord = inSerializer.createInstance();
 		try {
 			nextRecord = recordIterator.next(nextRecord);

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index e8a2ce1..a95965c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -18,7 +18,9 @@
 package org.apache.flink.streaming.api.streamvertex;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -51,25 +53,19 @@ public class InputHandler<IN> {
 		inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
 
 		int numberOfInputs = configuration.getNumberOfInputs();
-		if (numberOfInputs > 0) {
 
-			if (numberOfInputs < 2) {
-				inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(
-						streamVertex.getEnvironment().getInputGate(0));
+		if (numberOfInputs > 0) {
+			InputGate inputGate = numberOfInputs < 2 ? streamVertex.getEnvironment()
+					.getInputGate(0) : new UnionInputGate(streamVertex.getEnvironment()
+					.getAllInputGates());
 
-			} else {
-				inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(
-						new UnionInputGate(streamVertex.getEnvironment().getAllInputGates()));
-			}
+			inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate);
+			inputs.registerTaskEventListener(streamVertex.getSuperstepListener(),
+					StreamingSuperstep.class);
 
-			inputIter = createInputIterator();
+			inputIter = new IndexedReaderIterator<StreamRecord<IN>>(inputs, inputSerializer);
 		}
-	}
 
-	private IndexedReaderIterator<StreamRecord<IN>> createInputIterator() {
-		final IndexedReaderIterator<StreamRecord<IN>> iter = new IndexedReaderIterator<StreamRecord<IN>>(
-				inputs, inputSerializer);
-		return iter;
 	}
 
 	protected static <T> IndexedReaderIterator<StreamRecord<T>> staticCreateInputIterator(

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 359675d..82f1329 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -84,6 +85,13 @@ public class OutputHandler<OUT> {
 
 	}
 
+	public void broadcastBarrier(long id) throws IOException, InterruptedException {
+		StreamingSuperstep barrier = new StreamingSuperstep(id);
+		for (StreamOutput<?> streamOutput : outputMap.values()) {
+			streamOutput.broadcastEvent(barrier);
+		}
+	}
+
 	public Collection<StreamOutput<?>> getOutputs() {
 		return outputMap.values();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 99ca098..e2cdc34 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -17,10 +17,16 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
+import java.io.IOException;
 import java.util.Map;
 
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
+import org.apache.flink.runtime.jobmanager.BarrierAck;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -34,7 +40,10 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>
{
+import akka.actor.ActorRef;
+
+public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>,
+		BarrierTransceiver {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class);
 
@@ -53,10 +62,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements
StreamTa
 
 	protected ClassLoader userClassLoader;
 
+	private EventListener<TaskEvent> superstepListener;
+
 	public StreamVertex() {
 		userInvokable = null;
 		numTasks = newVertex();
 		instanceID = numTasks;
+		superstepListener = new SuperstepEventListener();
 	}
 
 	protected static int newVertex() {
@@ -78,6 +90,22 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements
StreamTa
 		this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
 	}
 
+	@Override
+	public void broadcastBarrier(long id) {
+		// Only called at input vertices
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Received barrier from jobmanager: " + id);
+		}
+		actOnBarrier(id);
+	}
+
+	@Override
+	public void confirmBarrier(long barrierID) {
+		getEnvironment().getJobManager().tell(
+				new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
+						context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
+	}
+
 	public void setInputsOutputs() {
 		inputHandler = new InputHandler<IN>(this);
 		outputHandler = new OutputHandler<OUT>(this);
@@ -205,4 +233,35 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements
StreamTa
 		throw new IllegalArgumentException("CoReader not available");
 	}
 
+	public EventListener<TaskEvent> getSuperstepListener() {
+		return this.superstepListener;
+	}
+
+	private void actOnBarrier(long id) {
+		try {
+			outputHandler.broadcastBarrier(id);
+			System.out.println("Superstep " + id + " processed: " + StreamVertex.this);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Superstep " + id + " processed: " + StreamVertex.this);
+			}
+		} catch (IOException e) {
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+	}
+
+	@Override
+	public String toString() {
+		return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")";
+	}
+
+	private class SuperstepEventListener implements EventListener<TaskEvent> {
+
+		@Override
+		public void onEvent(TaskEvent event) {
+			actOnBarrier(((StreamingSuperstep) event).getId());
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index bb20ecb..79f09c4 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -17,6 +17,10 @@
 
 package org.apache.flink.streaming.io;
 
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
@@ -28,10 +32,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 /**
  * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
  * types to read records effectively.

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
index 175dba2..025393d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
@@ -34,4 +34,4 @@ public class IndexedMutableReader<T extends IOReadableWritable> extends
MutableR
 	public int getNumberOfInputChannels() {
 		return reader.getNumberOfInputChannels();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 4b13165..af836e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -79,7 +79,7 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT>
{
 		@Override
 		public StreamRecord<IN> next() throws IOException {
 			if (listIterator.hasNext()) {
-				StreamRecord<IN> result = new StreamRecord<IN>();
+				StreamRecord<IN> result = inDeserializer.createInstance();
 				result.setObject(listIterator.next());
 				return result;
 			} else {


Mime
View raw message