flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [11/14] flink git commit: [FLINK-1638] [streaming] At-Least once monitoring semantics added and bug fixes
Date Tue, 10 Mar 2015 14:00:11 GMT
[FLINK-1638] [streaming] At-Least once monitoring semantics added and bug fixes

Fault Tolerance monitor suicide on ExecutionGraph terminal state

Forwarding messages to StreamCheckpointCoordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37390d63
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37390d63
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37390d63

Branch: refs/heads/master
Commit: 37390d6322ab984f17dd257f7a9939311c81edf5
Parents: ed5ba95
Author: Paris Carbone <seniorcarbone@gmail.com>
Authored: Fri Mar 6 15:10:12 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../deployment/TaskDeploymentDescriptor.java    |  15 ++-
 .../runtime/event/task/StreamingSuperstep.java  |  51 --------
 .../flink/runtime/executiongraph/Execution.java |  11 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  45 ++++++-
 .../runtime/executiongraph/ExecutionVertex.java |   9 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  33 +++++
 .../jobgraph/tasks/OperatorStateCarrier.java    |   4 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  70 +++-------
 .../StreamCheckpointCoordinator.scala           | 129 +++++++++++++++++++
 .../runtime/jobmanager/StreamStateMonitor.scala | 122 ------------------
 .../flink/runtime/taskmanager/TaskManager.scala |  21 +--
 .../connectors/kafka/KafkaConsumerExample.java  |   5 -
 .../kafka/api/simple/KafkaConsumerIterator.java |   5 +-
 .../KafkaDeserializingConsumerIterator.java     |   1 +
 .../flink/streaming/api/StreamConfig.java       |  14 ++
 .../apache/flink/streaming/api/StreamGraph.java |  20 +++
 .../api/StreamingJobGraphGenerator.java         |   9 +-
 .../environment/StreamExecutionEnvironment.java |  13 ++
 .../source/SocketTextStreamFunction.java        |   1 -
 .../api/streamvertex/InputHandler.java          |   1 -
 .../api/streamvertex/OutputHandler.java         |   1 -
 .../api/streamvertex/StreamVertex.java          |  17 ++-
 .../streamvertex/StreamingRuntimeContext.java   |   9 +-
 .../api/streamvertex/StreamingSuperstep.java    |  52 ++++++++
 .../flink/streaming/io/BarrierBuffer.java       |  74 +++++++++--
 .../flink/streaming/io/CoRecordReader.java      |  11 +-
 .../io/StreamingAbstractRecordReader.java       |   4 +-
 .../flink/streaming/io/BarrierBufferTest.java   |   2 +-
 .../streaming/examples/wordcount/WordCount.java |   5 -
 29 files changed, 452 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index a431a76..b2573f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.OperatorState;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -78,7 +79,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	/** The list of JAR files required to run this task. */
 	private final List<BlobKey> requiredJarFiles;
 	
-	private OperatorState operatorState;
+	private Map<String, OperatorState<?>> operatorStates;
 
 	/**
 	 * Constructs a task deployment descriptor.
@@ -128,13 +129,13 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			Configuration taskConfiguration, String invokableClassName,
 			List<PartitionDeploymentDescriptor> producedPartitions,
 			List<PartitionConsumerDeploymentDescriptor> consumedPartitions,
-			List<BlobKey> requiredJarFiles, int targetSlotNumber, OperatorState operatorState) {
+			List<BlobKey> requiredJarFiles, int targetSlotNumber, Map<String,OperatorState<?>> operatorStates) {
 
 		this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
 				jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
 				consumedPartitions, requiredJarFiles, targetSlotNumber);
 		
-		setOperatorState(operatorState);
+		setOperatorStates(operatorStates);
 	}
 
 	/**
@@ -243,11 +244,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
 				strProducedPartitions, strConsumedPartitions);
 	}
 
-	public void setOperatorState(OperatorState operatorState) {
-		this.operatorState = operatorState;
+	public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) {
+		this.operatorStates = operatorStates;
 	}
 
-	public OperatorState getOperatorState() {
-		return operatorState;
+	public Map<String, OperatorState<?>> getOperatorStates() {
+		return operatorStates;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
deleted file mode 100644
index e35eb28..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class StreamingSuperstep extends TaskEvent {
-
-	protected long id;
-
-	public StreamingSuperstep() {
-
-	}
-
-	public StreamingSuperstep(long id) {
-		this.id = id;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeLong(id);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		id = in.readLong();
-	}
-
-	public long getId() {
-		return id;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 89f5183..93845c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -56,6 +56,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeoutException;
@@ -123,7 +124,7 @@ public class Execution implements Serializable {
 	
 	private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
 	
-	private OperatorState operatorState;
+	private Map<String,OperatorState<?>> operatorStates;
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -857,11 +858,11 @@ public class Execution implements Serializable {
 				(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
 	}
 
-	public void setOperatorState(OperatorState operatorState) {
-		this.operatorState = operatorState;
+	public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) {
+		this.operatorStates = operatorStates;
 	}
 
-	public OperatorState getOperatorState() {
-		return operatorState;
+	public Map<String,OperatorState<?>> getOperatorStates() {
+		return operatorStates;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index bf34e33..0c6c3a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import akka.actor.ActorContext;
 import akka.actor.ActorRef;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.state.OperatorState;
@@ -38,8 +39,8 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Tuple3;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -52,6 +53,7 @@ import java.util.NoSuchElementException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static akka.dispatch.Futures.future;
@@ -118,6 +120,14 @@ public class ExecutionGraph implements Serializable {
 
 	private boolean allowQueuedScheduling = true;
 
+	private ActorContext parentContext;
+
+	private  ActorRef stateMonitorActor;
+	
+	private boolean monitoringEnabled;
+	
+	private long monitoringInterval = 10000;
+
 	private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 
 	
@@ -159,6 +169,18 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	// --------------------------------------------------------------------------------------------
+	
+	public void setStateMonitorActor(ActorRef stateMonitorActor) {
+		this.stateMonitorActor = stateMonitorActor;
+	}
+
+	public ActorRef getStateMonitorActor() {
+		return stateMonitorActor;
+	}
+
+	public void setParentContext(ActorContext parentContext) {
+		this.parentContext = parentContext;
+	}
 
 	public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
 		if (numberOfRetriesLeft < -1) {
@@ -214,6 +236,14 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
+	public void setMonitoringEnabled(boolean monitoringEnabled) {
+		this.monitoringEnabled = monitoringEnabled;
+	}
+
+	public void setMonitoringInterval(long  monitoringInterval) {
+		this.monitoringInterval = monitoringInterval;
+	}
+
 	/**
 	 * Returns a list of BLOB keys referring to the JAR files required to run this job
 	 * @return list of BLOB keys referring to the JAR files required to run this job
@@ -361,12 +391,17 @@ public class ExecutionGraph implements Serializable {
 					for (ExecutionJobVertex ejv : getVerticesTopologically()) {
 						ejv.scheduleAll(scheduler, allowQueuedScheduling);
 					}
-
 					break;
 
 				case BACKTRACKING:
 					throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
 			}
+
+			if(monitoringEnabled)
+			{
+				stateMonitorActor = StreamCheckpointCoordinator.spawn(parentContext, this,
+						Duration.create(monitoringInterval, TimeUnit.MILLISECONDS));
+			}
 		}
 		else {
 			throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
@@ -532,9 +567,9 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 	
-	public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> states)
+	public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> states)
 	{
-		for(Map.Entry<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> state : states.entrySet())
+		for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> state : states.entrySet())
 		{
 			tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index b7f962a..41d34d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -48,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import static com.google.common.base.Preconditions.checkElementIndex;
@@ -90,7 +91,7 @@ public class ExecutionVertex implements Serializable {
 	
 	private volatile boolean scheduleLocalOnly;
 	
-	private OperatorState operatorState;
+	private Map<String,OperatorState<?>> operatorState;
 	
 	// --------------------------------------------------------------------------------------------
 
@@ -198,11 +199,11 @@ public class ExecutionVertex implements Serializable {
 		return currentExecution.getAssignedResourceLocation();
 	}
 
-	public void setOperatorState(OperatorState operatorState) {
+	public void setOperatorState(Map<String,OperatorState<?>> operatorState) {
 		this.operatorState = operatorState;
 	}
 
-	public OperatorState getOperatorState() {
+	public Map<String,OperatorState<?>> getOperatorState() {
 		return operatorState;
 	}
 	
@@ -393,7 +394,7 @@ public class ExecutionVertex implements Serializable {
 				
 				if(operatorState!=null)
 				{
-					execution.setOperatorState(operatorState);
+					execution.setOperatorStates(operatorState);
 				}
 				
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 0cf2f5e..4b398e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -75,6 +75,14 @@ public class JobGraph implements Serializable {
 
 	private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 	
+	public enum JobType {STREAMING, BATCH}
+	
+	private JobType jobType = JobType.BATCH;
+	
+	private boolean monitoringEnabled = false;
+	
+	private long monitorInterval = 10000;
+	
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -253,6 +261,31 @@ public class JobGraph implements Serializable {
 		return this.taskVertices.size();
 	}
 
+
+	public void setJobType(JobType jobType) {
+		this.jobType = jobType;
+	}
+
+	public JobType getJobType() {
+		return jobType;
+	}
+
+	public void setMonitoringEnabled(boolean monitoringEnabled) {
+		this.monitoringEnabled = monitoringEnabled;
+	}
+
+	public boolean isMonitoringEnabled() {
+		return monitoringEnabled;
+	}
+
+	public void setMonitorInterval(long monitorInterval) {
+		this.monitorInterval = monitorInterval;
+	}
+
+	public long getMonitorInterval() {
+		return monitorInterval;
+	}
+
 	/**
 	 * Searches for a vertex with a matching ID and returns it.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index 6ea4f27..e8b6d6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.state.OperatorState;
 
+import java.util.Map;
+
 public interface OperatorStateCarrier {
 	
-	public void injectState(OperatorState state);
+	public void injectStates(Map<String, OperatorState<?>> state);
 	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 97a6099..c350680 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -104,7 +104,6 @@ class JobManager(val configuration: Configuration,
   /** List of current jobs running jobs */
   val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
   
-  val barrierMonitors = scala.collection.mutable.HashMap[JobID, ActorRef]()
 
   /**
    * Run when the job manager is started. Simply logs an informational message.
@@ -284,78 +283,43 @@ class JobManager(val configuration: Configuration,
           if(newJobStatus.isTerminalState) {
             jobInfo.end = timeStamp
 
-          // is the client waiting for the job result?
+            // is the client waiting for the job result?
             newJobStatus match {
               case JobStatus.FINISHED =>
                 val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID)
-                jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, accumulatorResults)
+                jobInfo.client ! JobResultSuccess(jobID,jobInfo.duration,accumulatorResults)
               case JobStatus.CANCELED =>
                 jobInfo.client ! Failure(new JobCancellationException(jobID,
-                  "Job was cancelled.", error))
+                  "Job was cancelled.",error))
               case JobStatus.FAILED =>
                 jobInfo.client ! Failure(new JobExecutionException(jobID,
-                  "Job execution failed.", error))
+                  "Job execution failed.",error))
               case x =>
-                val exception = new JobExecutionException(jobID, s"$x is not a " +
-                  "terminal state.")
+                val exception = new JobExecutionException(jobID,s"$x is not a " +
+                        "terminal state.")
                 jobInfo.client ! Failure(exception)
                 throw exception
             }
+
+            removeJob(jobID)
             
-            barrierMonitors.get(jobID) match {
-                         case Some(monitor) =>
-                           newJobStatus match{
-                             case JobStatus.FINISHED | JobStatus.CANCELED =>
-                               monitor ! PoisonPill
-                               barrierMonitors.remove(jobID)
-                             case JobStatus.FAILING => 
-                               monitor ! JobStateRequest
-                           }
-                          case None =>
-                            removeJob(jobID)
-                        }
-          }
-          else {
-            newJobStatus match {
-              case JobStatus.RUNNING => currentJobs.get(jobID) match {
-              case Some((executionGraph, _)) => 
-              //FIXME this is just a fast n dirty check for determining streaming jobs 
-              if (executionGraph.getScheduleMode == ScheduleMode.ALL) {
-                barrierMonitors.get(jobID) match {
-                  case None => 
-                    barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph)
-                }
-              }
-              case None =>
-                log.error("Cannot create state monitor for job ID {}.", jobID)
-                new IllegalStateException("Cannot find execution graph for job ID " + jobID)
-              }
-            }
           }
         case None =>
           removeJob(jobID)
-      }
+          }
 
     case msg: BarrierAck =>
-      barrierMonitors.get(msg.jobID) match {
-        case Some(monitor) => monitor ! msg
+      currentJobs.get(msg.jobID) match {
+        case Some(jobExecution) =>
+          jobExecution._1.getStateMonitorActor forward  msg
         case None =>
       }
     case msg: StateBarrierAck =>
-      barrierMonitors.get(msg.jobID) match {
-        case Some(monitor) => monitor ! msg
-        case None =>
-      }
-
-    case msg: JobStateResponse =>
-      //inject initial states and restart the job
       currentJobs.get(msg.jobID) match {
         case Some(jobExecution) =>
-          import scala.collection.JavaConverters._
-          jobExecution._1.loadOperatorStates(msg.opStates.asJava)
-          jobExecution._1.restart()
+          jobExecution._1.getStateMonitorActor forward  msg
         case None =>
-      } 
+      }
       
     case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
       currentJobs.get(jobId) match {
@@ -522,6 +486,9 @@ class JobManager(val configuration: Configuration,
         executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
         executionGraph.setScheduleMode(jobGraph.getScheduleMode)
         executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
+        
+        executionGraph.setMonitoringEnabled(jobGraph.isMonitoringEnabled)
+        executionGraph.setMonitoringInterval(jobGraph.getMonitorInterval)
 
         // initialize the vertices that have a master initialization hook
         // file output formats create directories here, input formats create splits
@@ -564,6 +531,9 @@ class JobManager(val configuration: Configuration,
           log.debug(s"Successfully created execution graph from job graph ${jobId} (${jobName}).")
         }
 
+        // give an actorContext
+        executionGraph.setParentContext(context);
+        
         // get notified about job status changes
         executionGraph.registerJobStatusListener(self)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
new file mode 100644
index 0000000..7ab6a6f
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import java.lang.Long
+
+import akka.actor._
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.execution.ExecutionState.RUNNING
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
+import org.apache.flink.runtime.jobgraph.JobStatus._
+import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
+import org.apache.flink.runtime.state.OperatorState
+
+import scala.collection.JavaConversions._
+import scala.collection.immutable.TreeMap
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.{FiniteDuration, _}
+
+object StreamCheckpointCoordinator {
+
+  def spawn(context: ActorContext,executionGraph: ExecutionGraph,
+            interval: FiniteDuration = 5 seconds): ActorRef = {
+
+    val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
+    val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph,
+      vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
+              List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
+    monitor ! InitBarrierScheduler
+    monitor
+  }
+
+  private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = {
+    for((_,execJobVertex) <- executionGraph.getAllVertices;
+        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
+    yield execVertex
+  }
+}
+
+class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
+                         val vertices: Iterable[ExecutionVertex],
+                         var acks: Map[(JobVertexID,Int),List[Long]],
+                         var states: Map[(JobVertexID, Integer, Long), 
+                                 java.util.Map[String,OperatorState[_]]],
+                         val interval: FiniteDuration,var curId: Long,var ackId: Long)
+        extends Actor with ActorLogMessages with ActorLogging {
+  
+  override def receiveWithLogMessages: Receive = {
+    
+    case InitBarrierScheduler =>
+      context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
+      context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate)
+      log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
+        executionGraph.getJobID,executionGraph.getJobName)
+      
+    case BarrierTimeout =>
+      executionGraph.getState match {
+        case FAILED | CANCELED | FINISHED =>
+          log.debug("[FT-MONITOR] Stopping monitor for terminated job {}", executionGraph.getJobID)
+          self ! PoisonPill
+        case _ =>
+          curId += 1
+          log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName)
+          vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex &&
+                  v.getExecutionState == RUNNING).foreach(vertex
+          => vertex.getCurrentAssignedResource.getInstance.getTaskManager
+                    ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
+      }
+      
+    case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
+      states += (jobVertexID, instanceID, checkpointID) -> opState
+      self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID)
+      
+    case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) =>
+          acks.get(jobVertexID,instanceID) match {
+            case Some(acklist) =>
+              acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
+            case None =>
+          }
+          log.debug(acks.toString)
+      
+      
+      
+    case CompactAndUpdate =>
+      val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
+      => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1)))
+      val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
+      ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
+      acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
+      states = states.filterKeys(_._3 >= ackId)
+      log.debug("[FT-MONITOR] Last global barrier is " + ackId)
+      executionGraph.loadOperatorStates(states)
+      
+  }
+  
+}
+
+case class BarrierTimeout()
+
+case class InitBarrierScheduler()
+
+case class CompactAndUpdate()
+
+case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
+
+case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long)
+
+case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer,
+                           checkpointID: Long, states: java.util.Map[String,OperatorState[_]])
+       
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
deleted file mode 100644
index 65840f9..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-import akka.actor._
-import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,ExecutionGraph,ExecutionVertex}
-import org.apache.flink.runtime.jobgraph.{JobID,JobVertexID}
-import org.apache.flink.runtime.state.OperatorState
-
-import java.lang.Long
-import scala.collection.JavaConversions._
-import scala.collection.immutable.TreeMap
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.{FiniteDuration,_}
-
-
-object StreamStateMonitor {
-
-  def props(context: ActorContext,executionGraph: ExecutionGraph,
-            interval: FiniteDuration = 5 seconds): ActorRef = {
-
-    val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
-    val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph,
-      vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
-              List.empty[Long])).toMap,Map(),interval,0L,-1L)))
-    monitor ! InitBarrierScheduler
-    monitor
-  }
-
-  private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = {
-    for((_,execJobVertex) <- executionGraph.getAllVertices;
-        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
-    yield execVertex
-  }
-}
-
-class StreamStateMonitor(val executionGraph: ExecutionGraph,
-                         val vertices: Iterable[ExecutionVertex],
-                         var acks: Map[(JobVertexID,Int),List[Long]],
-                         var states: Map[(JobVertexID, Integer, Long), OperatorState[_]],
-                         val interval: FiniteDuration,var curId: Long,var ackId: Long)
-        extends Actor with ActorLogMessages with ActorLogging {
-
-  override def receiveWithLogMessages: Receive = {
-    
-    case InitBarrierScheduler =>
-      context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
-      context.system.scheduler.schedule(2 * interval,2 * interval,self,TriggerBarrierCompaction)
-      log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
-        executionGraph.getJobID,executionGraph.getJobName)
-      
-    case BarrierTimeout =>
-      curId += 1
-      log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName)
-      vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex
-      => vertex.getCurrentAssignedResource.getInstance.getTaskManager
-                ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
-      
-    case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
-      states += (jobVertexID, instanceID, checkpointID) -> opState  
-      self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID)
-      
-    case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) =>
-      acks.get(jobVertexID,instanceID) match {
-        case Some(acklist) =>
-          acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
-        case None =>
-      }
-      log.debug(acks.toString)
-      
-    case TriggerBarrierCompaction =>
-      val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
-      => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1)))
-      val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
-      ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
-      acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
-      states = states.filterKeys(_._3 >= ackId)
-      log.debug("[FT-MONITOR] Last global barrier is " + ackId)
-
-    case JobStateRequest =>
-      sender ! JobStateResponse(executionGraph.getJobID, ackId, states)
-  }
-}
-
-case class BarrierTimeout()
-
-case class InitBarrierScheduler()
-
-case class TriggerBarrierCompaction()
-
-case class JobStateRequest()
-
-case class JobStateResponse(jobID: JobID, barrierID: Long, opStates: Map[(JobVertexID, Integer, 
-        Long), OperatorState[_]])
-
-case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
-
-case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long)
-
-case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer,
-                           checkpointID: Long, state: OperatorState[_])
-       
-
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 497e784..3de917b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -352,7 +352,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
       }
 
     case BarrierReq(attemptID, checkpointID) =>
-      log.debug("[FT-TaskManager] Barrier request received for attempt {}", attemptID)
+      log.debug("[FT-TaskManager] Barrier {} request received for attempt {}", 
+          checkpointID, attemptID)
       runningTasks.get(attemptID) match {
         case Some(i) =>
           if (i.getExecutionState == ExecutionState.RUNNING) {
@@ -416,15 +417,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
 
       task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
         tdd.getTaskName, self)
-
-      //inject operator state
-      if(tdd.getOperatorState != null)
-      {
-        val vertex = task.getEnvironment.getInvokable match {
-          case opStateCarrier: OperatorStateCarrier =>
-            opStateCarrier.injectState(tdd.getOperatorState)
-        }
-      }
       
       runningTasks.put(executionID, task) match {
         case Some(_) => throw new RuntimeException(
@@ -446,6 +438,15 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
 
       task.setEnvironment(env)
 
+      //inject operator state
+      if(tdd.getOperatorStates != null)
+      {
+        val vertex = task.getEnvironment.getInvokable match {
+          case opStateCarrier: OperatorStateCarrier =>
+            opStateCarrier.injectStates(tdd.getOperatorStates)
+        }
+      }
+      
       // register the task with the network stack and profiles
       networkEnvironment match {
         case Some(ne) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index d9bb7d3..dd1221d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -17,14 +17,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-<<<<<<< HEAD
-import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-=======
->>>>>>> a62796a... s
 import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
 
 public class KafkaConsumerExample {

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
index 92d351a..370b3f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.api.simple;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,13 +37,11 @@ import kafka.javaapi.TopicMetadata;
 import kafka.javaapi.TopicMetadataRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Iterates the records received from a partition of a Kafka topic as byte arrays.
  */
-public class KafkaConsumerIterator {
+public class KafkaConsumerIterator implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
index 6ca4c81..f2af6ca 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
@@ -21,6 +21,7 @@ import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 
 public class KafkaDeserializingConsumerIterator<IN> extends KafkaConsumerIterator {
 
+	private static final long serialVersionUID = 1L;
 	private DeserializationSchema<IN> deserializationSchema;
 
 	public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition, long waitOnEmptyFetch,

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 7c90629..d813a30 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -67,6 +67,7 @@ public class StreamConfig implements Serializable {
 	// DEFAULT VALUES
 
 	private static final long DEFAULT_TIMEOUT = 100;
+	public static final String STATE_MONITORING = "STATE_MONITORING";
 
 	// CONFIG METHODS
 
@@ -300,6 +301,18 @@ public class StreamConfig implements Serializable {
 		config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
 	}
 
+
+	public void setStateMonitoring(boolean stateMonitoring) {
+		
+		config.setBoolean(STATE_MONITORING, stateMonitoring);
+		
+	}
+	
+	public boolean getStateMonitoring()
+	{
+		return config.getBoolean(STATE_MONITORING, false);
+	}
+
 	@SuppressWarnings("unchecked")
 	public List<Tuple2<Integer, Integer>> getOutEdgesInOrder(ClassLoader cl) {
 		try {
@@ -399,6 +412,7 @@ public class StreamConfig implements Serializable {
 			builder.append("\nInvokable: Missing");
 		}
 		builder.append("\nBuffer timeout: " + getBufferTimeout());
+		builder.append("\nState Monitoring: " + getStateMonitoring());
 		if (isChainStart() && getChainedOutputs(cl).size() > 0) {
 			builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
 			builder.append(getTransitiveChainedTaskConfigs(cl)).toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 641708e..8334aa1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -92,6 +92,10 @@ public class StreamGraph extends StreamingPlan {
 	private Set<Integer> sources;
 
 	private ExecutionConfig executionConfig;
+	
+	private boolean monitoringEnabled;
+	
+	private long monitoringInterval = 10000;
 
 	public StreamGraph(ExecutionConfig executionConfig) {
 
@@ -606,6 +610,22 @@ public class StreamGraph extends StreamingPlan {
 		return operatorNames.get(vertexID);
 	}
 
+	public void setMonitoringEnabled(boolean monitoringEnabled) {
+		this.monitoringEnabled = monitoringEnabled;
+	}
+
+	public boolean isMonitoringEnabled() {
+		return monitoringEnabled;
+	}
+
+	public void setMonitoringInterval(long monitoringInterval) {
+		this.monitoringInterval = monitoringInterval;
+	}
+
+	public long getMonitoringInterval() {
+		return monitoringInterval;
+	}
+
 	@Override
 	public String getStreamingPlanAsJSON() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index c9698e3..b50ac25 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -74,7 +74,13 @@ public class StreamingJobGraphGenerator {
 
 		// Turn lazy scheduling off
 		jobGraph.setScheduleMode(ScheduleMode.ALL);
-
+		jobGraph.setJobType(JobGraph.JobType.STREAMING);
+		jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled());
+		jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval());
+		if(jobGraph.isMonitoringEnabled())
+		{
+			jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+		}
 		init();
 
 		setChaining();
@@ -211,6 +217,7 @@ public class StreamingJobGraphGenerator {
 		config.setNumberOfOutputs(nonChainableOutputs.size());
 		config.setOutputs(nonChainableOutputs);
 		config.setChainedOutputs(chainableOutputs);
+		config.setStateMonitoring(streamGraph.isMonitoringEnabled());
 
 		Class<? extends AbstractInvokable> vertexClass = streamGraph.getJobVertexClass(vertexID);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 835ce4e..9cc6131 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -151,6 +151,19 @@ public abstract class StreamExecutionEnvironment {
 		this.bufferTimeout = timeoutMillis;
 		return this;
 	}
+	
+	public StreamExecutionEnvironment enableMonitoring(long interval)
+	{
+		streamGraph.setMonitoringEnabled(true);
+		streamGraph.setMonitoringInterval(interval);
+		return this;
+	}
+	
+	public StreamExecutionEnvironment enableMonitoring()
+	{
+		streamGraph.setMonitoringEnabled(true);
+		return this;
+	}
 
 	/**
 	 * Sets the maximum time frequency (milliseconds) for the flushing of the

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
index 67bc128..d6a5b2b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
@@ -59,7 +59,6 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 		socket = new Socket();
-
 		socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index a95965c..d766705 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.streamvertex;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 82f1329..fd375f6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 24a90d0..5ff47d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.streamvertex;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -65,6 +64,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	protected ClassLoader userClassLoader;
 
 	private EventListener<TaskEvent> superstepListener;
+	
+	private boolean onRecovery;
 
 	public StreamVertex() {
 		userInvokable = null;
@@ -88,7 +89,10 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	protected void initialize() {
 		this.userClassLoader = getUserCodeClassLoader();
 		this.configuration = new StreamConfig(getTaskConfiguration());
-		this.states = configuration.getOperatorStates(userClassLoader);
+		if(!onRecovery)
+		{
+			this.states = configuration.getOperatorStates(userClassLoader);
+		}
 		this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
 	}
 
@@ -122,12 +126,12 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	@Override
 	public void confirmBarrier(long barrierID) {
 		
-		if(states != null && states.containsKey("kafka"))
+		if(configuration.getStateMonitoring() && states != null)
 		{
 			getEnvironment().getJobManager().tell(
 					new StateBarrierAck(getEnvironment().getJobID(), 
 							getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), 
-							barrierID, states.get("kafka")), ActorRef.noSender());
+							barrierID, states), ActorRef.noSender());
 		}
 		else
 		{
@@ -290,8 +294,9 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	}
 
 	@Override
-	public void injectState(OperatorState state) {
-		states.put("kafka", state);
+	public void injectStates(Map<String,OperatorState<?>> states) {
+		onRecovery = true;
+		this.states.putAll(states);
 	}
 	
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index 60dfe7a..492d2a0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -71,7 +71,14 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 		if (state == null) {
 			throw new RuntimeException("Cannot register null state");
 		} else {
-			operatorStates.put(name, state);
+			if(operatorStates.containsKey(name))
+			{
+				throw new RuntimeException("State is already registered");
+			}
+			else
+			{
+				operatorStates.put(name, state);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
new file mode 100644
index 0000000..557c636
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.TaskEvent;
+
+public class StreamingSuperstep extends TaskEvent {
+
+	protected long id;
+
+	public StreamingSuperstep() {
+
+	}
+
+	public StreamingSuperstep(long id) {
+		this.id = id;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeLong(id);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		id = in.readLong();
+	}
+
+	public long getId() {
+		return id;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
index 3ff718a..7dfccb0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -23,10 +23,10 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
 
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +53,12 @@ public class BarrierBuffer {
 		this.reader = reader;
 	}
 
+	/**
+	 * Starts the next superstep
+	 * 
+	 * @param superstep
+	 *            The next superstep
+	 */
 	protected void startSuperstep(StreamingSuperstep superstep) {
 		this.currentSuperstep = superstep;
 		this.superstepStarted = true;
@@ -61,30 +67,53 @@ public class BarrierBuffer {
 		}
 	}
 
+	/**
+	 * Buffers a bufferOrEvent received from a blocked channel
+	 * 
+	 * @param bufferOrEvent
+	 *            bufferOrEvent to buffer
+	 */
 	protected void store(BufferOrEvent bufferOrEvent) {
 		nonprocessed.add(bufferOrEvent);
 	}
 
+	/**
+	 * Get then next non-blocked non-processed BufferOrEvent. Returns null if
+	 * not available.
+	 */
 	protected BufferOrEvent getNonProcessed() {
-		BufferOrEvent nextNonprocessed = null;
-		while (nextNonprocessed == null && !nonprocessed.isEmpty()) {
-			nextNonprocessed = nonprocessed.poll();
+		BufferOrEvent nextNonprocessed;
+		while ((nextNonprocessed = nonprocessed.poll()) != null) {
 			if (isBlocked(nextNonprocessed.getChannelIndex())) {
 				blockedNonprocessed.add(nextNonprocessed);
-				nextNonprocessed = null;
+			} else {
+				return nextNonprocessed;
 			}
 		}
-		return nextNonprocessed;
+		return null;
 	}
 
+	/**
+	 * Checks whether a given channel index is blocked for this inputgate
+	 * 
+	 * @param channelIndex
+	 *            The channel index to check
+	 */
 	protected boolean isBlocked(int channelIndex) {
 		return blockedChannels.contains(channelIndex);
 	}
 
+	/**
+	 * Checks whether all channels are blocked meaning that barriers are
+	 * received from all channels
+	 */
 	protected boolean isAllBlocked() {
 		return blockedChannels.size() == totalNumberOfInputChannels;
 	}
 
+	/**
+	 * Returns the next non-blocked BufferOrEvent. This is a blocking operator.
+	 */
 	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
 		// If there are non-processed buffers from the previously blocked ones,
 		// we get the next
@@ -99,7 +128,7 @@ public class BarrierBuffer {
 				bufferOrEvent = inputGate.getNextBufferOrEvent();
 				if (isBlocked(bufferOrEvent.getChannelIndex())) {
 					// If channel blocked we just store it
-					store(bufferOrEvent);
+					blockedNonprocessed.add(bufferOrEvent);
 				} else {
 					return bufferOrEvent;
 				}
@@ -107,6 +136,12 @@ public class BarrierBuffer {
 		}
 	}
 
+	/**
+	 * Blocks the given channel index, from which a barrier has been received.
+	 * 
+	 * @param channelIndex
+	 *            The channel index to block.
+	 */
 	protected void blockChannel(int channelIndex) {
 		if (!blockedChannels.contains(channelIndex)) {
 			blockedChannels.add(channelIndex);
@@ -122,16 +157,27 @@ public class BarrierBuffer {
 		}
 	}
 
+	/**
+	 * Releases the blocks on all channels.
+	 */
 	protected void releaseBlocks() {
-		nonprocessed.addAll(blockedNonprocessed);
+		if (!nonprocessed.isEmpty()) {
+			// sanity check
+			throw new RuntimeException("Error in barrier buffer logic");
+		}
+		nonprocessed = blockedNonprocessed;
+		blockedNonprocessed = new LinkedList<BufferOrEvent>();
 		blockedChannels.clear();
-		blockedNonprocessed.clear();
 		superstepStarted = false;
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("All barriers received, blocks released");
 		}
 	}
 
+	/**
+	 * Method that is executed once the barrier has been received from all
+	 * channels.
+	 */
 	protected void actOnAllBlocked() {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Publishing barrier to the vertex");
@@ -140,10 +186,12 @@ public class BarrierBuffer {
 		releaseBlocks();
 	}
 
-	public String toString() {
-		return blockedChannels.toString();
-	}
-
+	/**
+	 * Processes a streaming superstep event
+	 * 
+	 * @param bufferOrEvent
+	 *            The BufferOrEvent containing the event
+	 */
 	public void processSuperstep(BufferOrEvent bufferOrEvent) {
 		StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
 		if (!superstepStarted) {

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 6a1f624..6c91f4d 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -19,11 +19,9 @@ package org.apache.flink.streaming.io;
 
 import java.io.IOException;
 import java.util.LinkedList;
-import java.util.Queue;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
@@ -33,6 +31,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 
 /**
  * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
@@ -66,8 +65,6 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 	private CoBarrierBuffer barrierBuffer1;
 	private CoBarrierBuffer barrierBuffer2;
 
-	private Queue<Integer> unprocessedIndices = new LinkedList<Integer>();
-
 	public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
 		super(new UnionInputGate(inputgate1, inputgate2));
 
@@ -109,14 +106,14 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 	@SuppressWarnings("unchecked")
 	protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
 
-		requestPartitionsOnce();	
+		requestPartitionsOnce();
 
 		while (true) {
 			if (currentReaderIndex == 0) {
 				if ((bufferReader1.isFinished() && bufferReader2.isFinished())) {
 					return 0;
 				}
-				
+
 				currentReaderIndex = getNextReaderIndexBlocking();
 
 			}
@@ -234,10 +231,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 	@Override
 	public void onEvent(InputGate bufferReader) {
 		if (bufferReader == bufferReader1) {
-			System.out.println("Added 1");
 			availableRecordReaders.add(1);
 		} else if (bufferReader == bufferReader2) {
-			System.out.println("Added 2");
 			availableRecordReaders.add(2);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
index 811c48a..d30c241 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
 		ReaderBase {
 
+	@SuppressWarnings("unused")
 	private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
 
 	private final RecordDeserializer<T>[] recordDeserializers;
@@ -56,6 +57,7 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable
 
 	private final BarrierBuffer barrierBuffer;
 
+	@SuppressWarnings("unchecked")
 	protected StreamingAbstractRecordReader(InputGate inputGate) {
 		super(inputGate);
 		barrierBuffer = new BarrierBuffer(inputGate, this);

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
index e7a03d9..203216b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Queue;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 import org.junit.Test;
 
 public class BarrierBufferTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/37390d63/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index b7a1ba3..c207d60 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -101,11 +101,6 @@ public class WordCount {
 
 			// emit the pairs
 			for (String token : tokens) {
-				//FIXME to be removed. added this for test purposes 
-				if("killme".equals(token))
-				{
-					throw new Exception("byee");
-				}
 				if (token.length() > 0) {
 					out.collect(new Tuple2<String, Integer>(token, 1));
 				}


Mime
View raw message