flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [01/14] flink git commit: [FLINK-1638] [streaming] Operator state checkpointing and injection prototype
Date Tue, 10 Mar 2015 14:00:01 GMT
Repository: flink
Updated Branches:
  refs/heads/master b4e8350f5 -> 2bba2b3f0


[FLINK-1638] [streaming] Operator state checkpointing and injection prototype


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/452c39a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/452c39a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/452c39a9

Branch: refs/heads/master
Commit: 452c39a965d93aab84d2fea84345badd2cc45975
Parents: a34869c
Author: Paris Carbone <seniorcarbone@gmail.com>
Authored: Wed Mar 4 18:30:09 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Mar 10 14:58:48 2015 +0100

----------------------------------------------------------------------
 .../deployment/TaskDeploymentDescriptor.java    |  26 +++++
 .../flink/runtime/executiongraph/Execution.java |  11 ++
 .../runtime/executiongraph/ExecutionGraph.java  |  10 ++
 .../runtime/executiongraph/ExecutionVertex.java |  19 +++-
 .../api/reader/AbstractRecordReader.java        |   5 +-
 .../jobgraph/tasks/OperatorStateCarrier.java    |  27 +++++
 .../flink/runtime/state/OperatorState.java      | 103 ++++++++++++++++++
 .../flink/runtime/state/StateCheckpoint.java    |  80 ++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  38 +++++--
 .../runtime/jobmanager/StreamStateMonitor.scala |  78 +++++++++-----
 .../flink/runtime/taskmanager/TaskManager.scala |  15 ++-
 .../flink/streaming/api/StreamConfig.java       |   2 +-
 .../apache/flink/streaming/api/StreamGraph.java |   2 +-
 .../datastream/SingleOutputStreamOperator.java  |   2 +-
 .../api/streamvertex/StreamVertex.java          |  53 ++++++++--
 .../streamvertex/StreamingRuntimeContext.java   |   2 +-
 .../apache/flink/streaming/state/MapState.java  |   4 +-
 .../flink/streaming/state/OperatorState.java    | 105 -------------------
 .../streaming/state/PartitionableState.java     |   2 +
 .../flink/streaming/state/SimpleState.java      |   3 +-
 .../state/checkpoint/MapCheckpoint.java         |   3 +-
 .../state/checkpoint/StateCheckpoint.java       |  82 ---------------
 .../flink/streaming/state/MapStateTest.java     |   2 +-
 .../streaming/state/OperatorStateTest.java      |   3 +-
 .../streaming/examples/wordcount/WordCount.java |   5 +
 25 files changed, 439 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 20204d5..a431a76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.OperatorState;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -76,6 +77,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 	/** The list of JAR files required to run this task. */
 	private final List<BlobKey> requiredJarFiles;
+	
+	private OperatorState operatorState;
 
 	/**
 	 * Constructs a task deployment descriptor.
@@ -119,6 +122,21 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.requiredJarFiles = new ArrayList<BlobKey>();
 	}
 
+	public TaskDeploymentDescriptor(
+			JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, String taskName,
+			int indexInSubtaskGroup, int numberOfSubtasks, Configuration jobConfiguration,
+			Configuration taskConfiguration, String invokableClassName,
+			List<PartitionDeploymentDescriptor> producedPartitions,
+			List<PartitionConsumerDeploymentDescriptor> consumedPartitions,
+			List<BlobKey> requiredJarFiles, int targetSlotNumber, OperatorState operatorState) {
+
+		this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
+				jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
+				consumedPartitions, requiredJarFiles, targetSlotNumber);
+		
+		setOperatorState(operatorState);
+	}
+
 	/**
 	 * Returns the ID of the job the tasks belongs to.
 	 */
@@ -224,4 +242,12 @@ public final class TaskDeploymentDescriptor implements Serializable {
 				taskName, indexInSubtaskGroup, numberOfSubtasks, invokableClassName,
 				strProducedPartitions, strConsumedPartitions);
 	}
+
+	public void setOperatorState(OperatorState operatorState) {
+		this.operatorState = operatorState;
+	}
+
+	public OperatorState getOperatorState() {
+		return operatorState;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 57ed4c0..89f5183 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 
@@ -122,6 +123,8 @@ public class Execution implements Serializable {
 	
 	private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
 	
+	private OperatorState operatorState;
+
 	// --------------------------------------------------------------------------------------------
 	
 	public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) {
@@ -853,4 +856,12 @@ public class Execution implements Serializable {
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
 				(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
 	}
+
+	public void setOperatorState(OperatorState operatorState) {
+		this.operatorState = operatorState;
+	}
+
+	public OperatorState getOperatorState() {
+		return operatorState;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e6d9c85..bf34e33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -33,11 +33,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Tuple3;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -529,6 +531,14 @@ public class ExecutionGraph implements Serializable {
 			return false;
 		}
 	}
+	
+	public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> states)
+	{
+		for(Map.Entry<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> state : states.entrySet())
+		{
+			tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
+		}
+	}
 
 	public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex) {
 		Execution execution = currentExecutions.get(executionId);

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0158fbf..b7f962a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.state.OperatorState;
 import org.slf4j.Logger;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -89,6 +90,8 @@ public class ExecutionVertex implements Serializable {
 	
 	private volatile boolean scheduleLocalOnly;
 	
+	private OperatorState operatorState;
+	
 	// --------------------------------------------------------------------------------------------
 
 	public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
@@ -194,6 +197,14 @@ public class ExecutionVertex implements Serializable {
 	public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
 		return currentExecution.getAssignedResourceLocation();
 	}
+
+	public void setOperatorState(OperatorState operatorState) {
+		this.operatorState = operatorState;
+	}
+
+	public OperatorState getOperatorState() {
+		return operatorState;
+	}
 	
 	public ExecutionGraph getExecutionGraph() {
 		return this.jobVertex.getGraph();
@@ -379,6 +390,12 @@ public class ExecutionVertex implements Serializable {
 				if (grp != null) {
 					this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
 				}
+				
+				if(operatorState!=null)
+				{
+					execution.setOperatorState(operatorState);
+				}
+				
 			}
 			else {
 				throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
@@ -506,7 +523,7 @@ public class ExecutionVertex implements Serializable {
 		return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(),
 				subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
 				jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
-				producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber());
+				producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber(), operatorState);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index cc36438..920792c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -19,10 +19,7 @@
 package org.apache.flink.runtime.io.network.api.reader;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
+
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.event.task.AbstractEvent;

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
new file mode 100644
index 0000000..6ea4f27
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.runtime.state.OperatorState;
+
+public interface OperatorStateCarrier {
+	
+	public void injectState(OperatorState state);
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
new file mode 100644
index 0000000..74ea1a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+
+/**
+ * Abstract class for representing operator states in Flink programs. By
+ * implementing the methods declared in this abstraction the state of the
+ * operator can be checkpointed by the fault tolerance mechanism.
+ *
+ * @param <T>
+ *            The type of the operator state.
+ */
+public abstract class OperatorState<T> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	public T state;
+
+	/**
+	 * Constructor used for initializing the state. In case of failure, the
+	 * state will be reinitialized using this constructor, then
+	 * {@link #restore(StateCheckpoint)} will be used to restore from the last
+	 * available backup.
+	 */
+	public OperatorState() {
+		state = null;
+	}
+
+	/**
+	 * Initializes the state using the given state object.
+	 * 
+	 * @param initialState
+	 *            The initial state object
+	 */
+	public OperatorState(T initialState) {
+		state = initialState;
+	}
+
+	/**
+	 * Returns the currently stored state object.
+	 * 
+	 * @return The state.
+	 */
+	public T getState() {
+		return state;
+	}
+
+	/**
+	 * Sets the current state object.
+	 * 
+	 * @param state
+	 *            The new state object.
+	 * @return The operator state with the new state object set.
+	 */
+	public OperatorState<T> setState(T state) {
+		this.state = state;
+		return this;
+	}
+
+	/**
+	 * Creates a {@link StateCheckpoint} that will be used to backup the state
+	 * for failure recovery.
+	 * 
+	 * @return The {@link StateCheckpoint} created.
+	 */
+	public abstract StateCheckpoint<T> checkpoint();
+
+	/**
+	 * Restores the state from the given {@link StateCheckpoint}.
+	 * 
+	 * @param checkpoint
+	 *            The checkpoint to restore from
+	 * @return The restored operator.
+	 */
+	public abstract OperatorState<T> restore(StateCheckpoint<T> checkpoint);
+
+	@Override
+	public String toString() {
+		return state.toString();
+	}
+
+	public boolean stateEquals(OperatorState<T> other) {
+		return state.equals(other.state);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
new file mode 100644
index 0000000..4e4906e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+
+/**
+ * Base class for creating checkpoints for {@link OperatorState}. This
+ * checkpoints will be used to backup states in stateful Flink operators and
+ * also to restore them in case of node failure. To allow incremental
+ * checkpoints override the {@link #update(StateCheckpoint)} method.
+ * 
+ * @param <T>
+ *            The type of the state.
+ */
+public class StateCheckpoint<T> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	public T checkpointedState;
+
+	/**
+	 * Creates a state checkpoint from the given {@link OperatorState}
+	 * 
+	 * @param operatorState
+	 *            The {@link OperatorState} to checkpoint.
+	 */
+	public StateCheckpoint(OperatorState<T> operatorState) {
+		this.checkpointedState = operatorState.getState();
+	}
+
+	public StateCheckpoint() {
+		this.checkpointedState = null;
+	}
+
+	/**
+	 * Returns the state object for the checkpoint.
+	 * 
+	 * @return The checkpointed state object.
+	 */
+	public T getCheckpointedState() {
+		return checkpointedState;
+	}
+
+	/**
+	 * Updates the checkpoint from next one. Override this method to allow
+	 * incremental updates.
+	 * 
+	 * @param nextCheckpoint
+	 *            The {@link StateCheckpoint} will be used to update from.
+	 */
+	public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) {
+		this.checkpointedState = nextCheckpoint.getCheckpointedState();
+		return this;
+	}
+
+	@Override
+	public String toString() {
+		return checkpointedState.toString();
+	}
+
+	public boolean stateEquals(StateCheckpoint<T> other) {
+		return checkpointedState.equals(other.checkpointedState);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4f58ba7..97a6099 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -301,13 +301,19 @@ class JobManager(val configuration: Configuration,
                 jobInfo.client ! Failure(exception)
                 throw exception
             }
+            
             barrierMonitors.get(jobID) match {
                          case Some(monitor) =>
-                              monitor ! PoisonPill
-                              barrierMonitors.remove(jobID)
+                           newJobStatus match{
+                             case JobStatus.FINISHED | JobStatus.CANCELED =>
+                               monitor ! PoisonPill
+                               barrierMonitors.remove(jobID)
+                             case JobStatus.FAILING => 
+                               monitor ! JobStateRequest
+                           }
                           case None =>
+                            removeJob(jobID)
                         }
-            removeJob(jobID)
           }
           else {
             newJobStatus match {
@@ -315,7 +321,10 @@ class JobManager(val configuration: Configuration,
               case Some((executionGraph, _)) => 
               //FIXME this is just a fast n dirty check for determining streaming jobs 
               if (executionGraph.getScheduleMode == ScheduleMode.ALL) {
-                barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph)
+                barrierMonitors.get(jobID) match {
+                  case None => 
+                    barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph)
+                }
               }
               case None =>
                 log.error("Cannot create state monitor for job ID {}.", jobID)
@@ -327,12 +336,27 @@ class JobManager(val configuration: Configuration,
           removeJob(jobID)
       }
 
-    case BarrierAck(jobID, jobVertex, instanceID, checkpoint) =>
-      barrierMonitors.get(jobID) match {
-        case Some(monitor) => monitor ! BarrierAck(jobID, jobVertex, instanceID, checkpoint)
+    case msg: BarrierAck =>
+      barrierMonitors.get(msg.jobID) match {
+        case Some(monitor) => monitor ! msg
+        case None =>
+      }
+    case msg: StateBarrierAck =>
+      barrierMonitors.get(msg.jobID) match {
+        case Some(monitor) => monitor ! msg
         case None =>
       }
 
+    case msg: JobStateResponse =>
+      //inject initial states and restart the job
+      currentJobs.get(msg.jobID) match {
+        case Some(jobExecution) =>
+          import scala.collection.JavaConverters._
+          jobExecution._1.loadOperatorStates(msg.opStates.asJava)
+          jobExecution._1.restart()
+        case None =>
+      } 
+      
     case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
       currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
index a37ddb5..65840f9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
@@ -20,65 +20,82 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor._
 import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
-import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,ExecutionGraph,ExecutionVertex}
+import org.apache.flink.runtime.jobgraph.{JobID,JobVertexID}
+import org.apache.flink.runtime.state.OperatorState
 
-import scala.collection.JavaConversions.mapAsScalaMap
+import java.lang.Long
+import scala.collection.JavaConversions._
 import scala.collection.immutable.TreeMap
 import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.{FiniteDuration, _}
+import scala.concurrent.duration.{FiniteDuration,_}
 
 
 object StreamStateMonitor {
 
-  def props(context: ActorContext, executionGraph: ExecutionGraph,
+  def props(context: ActorContext,executionGraph: ExecutionGraph,
             interval: FiniteDuration = 5 seconds): ActorRef = {
 
     val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
     val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph,
-      vertices, vertices.map(x => ((x.getJobVertex.getJobVertexId, x.getParallelSubtaskIndex), List.empty[Long])).toMap, interval, 0L, -1L)))
+      vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
+              List.empty[Long])).toMap,Map(),interval,0L,-1L)))
     monitor ! InitBarrierScheduler
     monitor
   }
 
   private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = {
-    for ((_, execJobVertex) <- executionGraph.getAllVertices;
-         execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
+    for((_,execJobVertex) <- executionGraph.getAllVertices;
+        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
     yield execVertex
   }
 }
 
 class StreamStateMonitor(val executionGraph: ExecutionGraph,
-                         val vertices: Iterable[ExecutionVertex], var acks: Map[(JobVertexID, Int), List[Long]],
-                         val interval: FiniteDuration, var curId: Long, var ackId: Long)
+                         val vertices: Iterable[ExecutionVertex],
+                         var acks: Map[(JobVertexID,Int),List[Long]],
+                         var states: Map[(JobVertexID, Integer, Long), OperatorState[_]],
+                         val interval: FiniteDuration,var curId: Long,var ackId: Long)
         extends Actor with ActorLogMessages with ActorLogging {
 
   override def receiveWithLogMessages: Receive = {
+    
     case InitBarrierScheduler =>
-      context.system.scheduler.schedule(interval, interval, self, BarrierTimeout)
-      context.system.scheduler.schedule(2 * interval, 2 * interval, self, UpdateCurrentBarrier)
+      context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
+      context.system.scheduler.schedule(2 * interval,2 * interval,self,TriggerBarrierCompaction)
       log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
-        executionGraph.getJobID, executionGraph.getJobName)
+        executionGraph.getJobID,executionGraph.getJobName)
+      
     case BarrierTimeout =>
       curId += 1
       log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName)
       vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex
       => vertex.getCurrentAssignedResource.getInstance.getTaskManager
-                ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId, curId))
-    case BarrierAck(_, jobVertexID, instanceID, checkpointID) =>
-      acks.get(jobVertexID, instanceID) match {
+                ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
+      
+    case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
+      states += (jobVertexID, instanceID, checkpointID) -> opState  
+      self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID)
+      
+    case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) =>
+      acks.get(jobVertexID,instanceID) match {
         case Some(acklist) =>
-          acks += (jobVertexID, instanceID) -> (checkpointID :: acklist)
+          acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
         case None =>
       }
-      log.info(acks.toString)
-    case UpdateCurrentBarrier =>
-      val barrierCount = acks.values.foldLeft(TreeMap[Long, Int]().withDefaultValue(0))((dict, myList)
-      => myList.foldLeft(dict)((dict2, elem) => dict2.updated(elem, dict2(elem) + 1)))
+      log.debug(acks.toString)
+      
+    case TriggerBarrierCompaction =>
+      val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
+      => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1)))
       val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
-      ackId = if (!keysToKeep.isEmpty) keysToKeep.max else ackId
-      acks.keys.foreach(x => acks = acks.updated(x, acks(x).filter(_ >= ackId)))
+      ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
+      acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
+      states = states.filterKeys(_._3 >= ackId)
       log.debug("[FT-MONITOR] Last global barrier is " + ackId)
+
+    case JobStateRequest =>
+      sender ! JobStateResponse(executionGraph.getJobID, ackId, states)
   }
 }
 
@@ -86,11 +103,20 @@ case class BarrierTimeout()
 
 case class InitBarrierScheduler()
 
-case class UpdateCurrentBarrier()
+case class TriggerBarrierCompaction()
+
+case class JobStateRequest()
+
+case class JobStateResponse(jobID: JobID, barrierID: Long, opStates: Map[(JobVertexID, Integer, 
+        Long), OperatorState[_]])
+
+case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
 
-case class BarrierReq(attemptID: ExecutionAttemptID, checkpointID: Long)
+case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long)
 
-case class BarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Int, checkpointID: Long)
+case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer,
+                           checkpointID: Long, state: OperatorState[_])
+       
 
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 4271141..497e784 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
-import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver
+import org.apache.flink.runtime.jobgraph.tasks.{OperatorStateCarrier,BarrierTransceiver}
 import org.apache.flink.runtime.jobmanager.{BarrierReq,JobManager}
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
@@ -358,7 +358,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
           if (i.getExecutionState == ExecutionState.RUNNING) {
             i.getEnvironment.getInvokable match {
               case barrierTransceiver: BarrierTransceiver =>
-                barrierTransceiver.broadcastBarrier(checkpointID)
+                new Thread(new Runnable {
+                  override def run(): Unit =  barrierTransceiver.broadcastBarrier(checkpointID);
+                }).start()
               case _ => log.error("[FT-TaskManager] Received a barrier for the wrong vertex")
             }
           }
@@ -415,6 +417,15 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
       task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
         tdd.getTaskName, self)
 
+      //inject operator state
+      if(tdd.getOperatorState != null)
+      {
+        val vertex = task.getEnvironment.getInvokable match {
+          case opStateCarrier: OperatorStateCarrier =>
+            opStateCarrier.injectState(tdd.getOperatorState)
+        }
+      }
+      
       runningTasks.put(executionID, task) match {
         case Some(_) => throw new RuntimeException(
           s"TaskManager contains already a task with executionID $executionID.")

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index d464ef1..7c90629 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.util.InstantiationUtil;
 
 public class StreamConfig implements Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index f69605b..640416d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
 import org.apache.flink.streaming.api.streamvertex.StreamVertex;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.sling.commons.json.JSONArray;
 import org.apache.sling.commons.json.JSONException;
 import org.apache.sling.commons.json.JSONObject;

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index cdf43ee..b0fc364 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.runtime.state.OperatorState;
 
 /**
  * The SingleOutputStreamOperator represents a user defined transformation

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index e2cdc34..24a90d0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -25,15 +25,17 @@ import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
+import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.jobmanager.BarrierAck;
+import org.apache.flink.runtime.jobmanager.StateBarrierAck;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.streaming.state.OperatorState;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.StringUtils;
@@ -43,7 +45,7 @@ import org.slf4j.LoggerFactory;
 import akka.actor.ActorRef;
 
 public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>,
-		BarrierTransceiver {
+		BarrierTransceiver, OperatorStateCarrier {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class);
 
@@ -90,9 +92,27 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 		this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
 	}
 
+	protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
+		userInvokable.setRuntimeContext(context);
+		userInvokable.open(getTaskConfiguration());
+
+		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+			invokable.setRuntimeContext(context);
+			invokable.open(getTaskConfiguration());
+		}
+
+		userInvokable.invoke();
+		userInvokable.close();
+
+		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+			invokable.close();
+		}
+
+	}
+
 	@Override
 	public void broadcastBarrier(long id) {
-		// Only called at input vertices
+		//Only called at input vertices
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Received barrier from jobmanager: " + id);
 		}
@@ -101,9 +121,21 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 
 	@Override
 	public void confirmBarrier(long barrierID) {
-		getEnvironment().getJobManager().tell(
-				new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
-						context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
+		
+		if(states != null && states.containsKey("kafka"))
+		{
+			getEnvironment().getJobManager().tell(
+					new StateBarrierAck(getEnvironment().getJobID(), 
+							getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), 
+							barrierID, states.get("kafka")), ActorRef.noSender());
+		}
+		else
+		{
+			getEnvironment().getJobManager().tell(
+					new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
+							context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());	
+		}
+		
 	}
 
 	public void setInputsOutputs() {
@@ -240,7 +272,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	private void actOnBarrier(long id) {
 		try {
 			outputHandler.broadcastBarrier(id);
-			System.out.println("Superstep " + id + " processed: " + StreamVertex.this);
+			//TODO checkpoint state here
+			confirmBarrier(id);
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Superstep " + id + " processed: " + StreamVertex.this);
 			}
@@ -256,6 +289,12 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 		return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")";
 	}
 
+	@Override
+	public void injectState(OperatorState state) {
+		states.put("kafka", state);
+	}
+	
+
 	private class SuperstepEventListener implements EventListener<TaskEvent> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index 0daf3c2..a47100b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -27,7 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.runtime.state.OperatorState;
 
 /**
  * Implementation of the {@link RuntimeContext}, created by runtime stream UDF

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
index 85aec52..1b861f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java
@@ -23,8 +23,10 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
+import org.apache.flink.runtime.state.StateCheckpoint;
+
 
 /**
  * A Map that can be used as a partitionable operator state, for both fault

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java
deleted file mode 100644
index a0cedba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
-
-/**
- * Abstract class for representing operator states in Flink programs. By
- * implementing the methods declared in this abstraction the state of the
- * operator can be checkpointed by the fault tolerance mechanism.
- *
- * @param <T>
- *            The type of the operator state.
- */
-public abstract class OperatorState<T> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	protected T state;
-
-	/**
-	 * Constructor used for initializing the state. In case of failure, the
-	 * state will be reinitialized using this constructor, then
-	 * {@link #restore(StateCheckpoint)} will be used to restore from the last
-	 * available backup.
-	 */
-	public OperatorState() {
-		state = null;
-	}
-
-	/**
-	 * Initializes the state using the given state object.
-	 * 
-	 * @param initialState
-	 *            The initial state object
-	 */
-	public OperatorState(T initialState) {
-		state = initialState;
-	}
-
-	/**
-	 * Returns the currently stored state object.
-	 * 
-	 * @return The state.
-	 */
-	public T getState() {
-		return state;
-	}
-
-	/**
-	 * Sets the current state object.
-	 * 
-	 * @param state
-	 *            The new state object.
-	 * @return The operator state with the new state object set.
-	 */
-	public OperatorState<T> setState(T state) {
-		this.state = state;
-		return this;
-	}
-
-	/**
-	 * Creates a {@link StateCheckpoint} that will be used to backup the state
-	 * for failure recovery.
-	 * 
-	 * @return The {@link StateCheckpoint} created.
-	 */
-	public abstract StateCheckpoint<T> checkpoint();
-
-	/**
-	 * Restores the state from the given {@link StateCheckpoint}.
-	 * 
-	 * @param checkpoint
-	 *            The checkpoint to restore from
-	 * @return The restored operator.
-	 */
-	public abstract OperatorState<T> restore(StateCheckpoint<T> checkpoint);
-
-	@Override
-	public String toString() {
-		return state.toString();
-	}
-
-	public boolean stateEquals(OperatorState<T> other) {
-		return state.equals(other.state);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
index c58c545..ddedcd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.state;
 
+import org.apache.flink.runtime.state.OperatorState;
+
 /**
  * Base class for representing operator states that can be repartitioned for
  * state state and load balancing.

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
index 7ae1f81..b76f5ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.streaming.state;
 
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
+import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateCheckpoint;
 
 /**
  * Basic {@link OperatorState} for storing and updating simple objects. By default the

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
index 15d1fd5..ee27d4f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java
@@ -21,8 +21,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateCheckpoint;
 import org.apache.flink.streaming.state.MapState;
-import org.apache.flink.streaming.state.OperatorState;
 
 public class MapCheckpoint<K, V> extends StateCheckpoint<Map<K, V>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
deleted file mode 100644
index 8b76245..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state.checkpoint;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.state.OperatorState;
-
-/**
- * Base class for creating checkpoints for {@link OperatorState}. This
- * checkpoints will be used to backup states in stateful Flink operators and
- * also to restore them in case of node failure. To allow incremental
- * checkpoints override the {@link #update(StateCheckpoint)} method.
- * 
- * @param <T>
- *            The type of the state.
- */
-public class StateCheckpoint<T> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	T checkpointedState;
-
-	/**
-	 * Creates a state checkpoint from the given {@link OperatorState}
-	 * 
-	 * @param operatorState
-	 *            The {@link OperatorState} to checkpoint.
-	 */
-	public StateCheckpoint(OperatorState<T> operatorState) {
-		this.checkpointedState = operatorState.getState();
-	}
-
-	public StateCheckpoint() {
-		this.checkpointedState = null;
-	}
-
-	/**
-	 * Returns the state object for the checkpoint.
-	 * 
-	 * @return The checkpointed state object.
-	 */
-	public T getCheckpointedState() {
-		return checkpointedState;
-	}
-
-	/**
-	 * Updates the checkpoint from next one. Override this method to allow
-	 * incremental updates.
-	 * 
-	 * @param nextCheckpoint
-	 *            The {@link StateCheckpoint} will be used to update from.
-	 */
-	public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) {
-		this.checkpointedState = nextCheckpoint.getCheckpointedState();
-		return this;
-	}
-
-	@Override
-	public String toString() {
-		return checkpointedState.toString();
-	}
-
-	public boolean stateEquals(StateCheckpoint<T> other) {
-		return checkpointedState.equals(other.checkpointedState);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
index 194403c..98bafe4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
+import org.apache.flink.runtime.state.StateCheckpoint;
 import org.junit.Test;
 
 public class MapStateTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
index 6cb8f51..4e07a3f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
@@ -20,7 +20,8 @@ package org.apache.flink.streaming.state;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
+import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateCheckpoint;
 import org.junit.Test;
 
 public class OperatorStateTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/452c39a9/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index c207d60..b7a1ba3 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -101,6 +101,11 @@ public class WordCount {
 
 			// emit the pairs
 			for (String token : tokens) {
+				//FIXME to be removed. added this for test purposes 
+				if("killme".equals(token))
+				{
+					throw new Exception("byee");
+				}
 				if (token.length() > 0) {
 					out.collect(new Tuple2<String, Integer>(token, 1));
 				}


Mime
View raw message