flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [4/6] flink git commit: [streaming] StateHandleProvider added for configurable state backend
Date Tue, 19 May 2015 20:23:51 GMT
[streaming] StateHandleProvider added for configurable state backend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/197cd6cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/197cd6cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/197cd6cf

Branch: refs/heads/master
Commit: 197cd6cf0fb10efc6badd5fc8584f4b36b09e705
Parents: 59bee4a
Author: Gyula Fora <gyfora@apache.org>
Authored: Sat May 16 22:38:16 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Tue May 19 18:32:02 2015 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  6 +--
 .../flink/runtime/checkpoint/StateForTask.java  |  2 -
 .../checkpoint/SuccessfulCheckpoint.java        |  2 +-
 .../runtime/state/ByteStreamStateHandle.java    |  3 +-
 .../flink/runtime/state/FileStateHandle.java    | 34 +++++++++++++-
 .../flink/runtime/state/LocalStateHandle.java   | 17 ++++++-
 .../runtime/state/StateHandleProvider.java      | 39 ++++++++++++++++
 .../environment/StreamExecutionEnvironment.java | 15 ++++++
 .../flink/streaming/api/graph/StreamConfig.java | 21 +++++++++
 .../flink/streaming/api/graph/StreamGraph.java  | 11 +++++
 .../api/graph/StreamingJobGraphGenerator.java   |  1 +
 .../streaming/runtime/tasks/StreamTask.java     |  7 ++-
 .../api/scala/StreamExecutionEnvironment.scala  | 15 ++++--
 .../ProcessFailureStreamingRecoveryITCase.java  | 49 ++++++++++++++------
 14 files changed, 192 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e4abbf8..b52e732 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -176,7 +176,7 @@ public class CheckpointCoordinator {
 			
 			// clean and discard all successful checkpoints
 			for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
-				checkpoint.dispose(userClassLoader);
+				checkpoint.discard(userClassLoader);
 			}
 			completedCheckpoints.clear();
 		}
@@ -334,7 +334,7 @@ public class CheckpointCoordinator {
 						completed = checkpoint.toCompletedCheckpoint();
 						completedCheckpoints.addLast(completed);
 						if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
-							completedCheckpoints.removeFirst().dispose(userClassLoader);;
+							completedCheckpoints.removeFirst().discard(userClassLoader);
 						}
 						pendingCheckpoints.remove(checkpointId);
 						rememberRecentCheckpointId(checkpointId);
@@ -409,7 +409,7 @@ public class CheckpointCoordinator {
 												boolean allOrNothingState) throws Exception {
 		synchronized (lock) {
 			if (shutdown) {
-				throw new IllegalStateException("CheckpointCoordinator is hut down");
+				throw new IllegalStateException("CheckpointCoordinator is shut down");
 			}
 			
 			if (completedCheckpoints.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 073b22f..73deeed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import java.io.IOException;
-
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.SerializedValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
index 3f77138..be0b301 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
@@ -66,7 +66,7 @@ public class SuccessfulCheckpoint {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public void dispose(ClassLoader userClassLoader) {
+	public void discard(ClassLoader userClassLoader) {
 		for(StateForTask state: states){
 			state.discard(userClassLoader);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
index a202a83..d7dbb84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -37,7 +36,7 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>
 
 	transient Serializable state;
 
-	public ByteStreamStateHandle(Serializable state) throws IOException {
+	public ByteStreamStateHandle(Serializable state) {
 		this.state = state;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
index 956bd9d..091c739 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
@@ -29,7 +29,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-import scala.util.Random;
+import java.util.Random;
 
 /**
  * Statehandle that writes the checkpointed state to a random file in the
@@ -44,7 +44,7 @@ public class FileStateHandle extends ByteStreamStateHandle {
 
 	private String pathString;
 
-	public FileStateHandle(Serializable state, String folder) throws IOException {
+	public FileStateHandle(Serializable state, String folder) {
 		super(state);
 		this.pathString = folder + "/" + randomString();
 	}
@@ -68,4 +68,34 @@ public class FileStateHandle extends ByteStreamStateHandle {
 		FileSystem.get(new URI(pathString)).delete(new Path(pathString), false);
 	}
 
+	/**
+	 * Creates a {@link StateHandleProvider} for creating
+	 * {@link FileStateHandle}s for a given checkpoint directory.
+	 * 
+	 */
+	public static StateHandleProvider<Serializable> createProvider(String checkpointDir)
{
+		return new FileStateHandleProvider(checkpointDir);
+	}
+
+	/**
+	 * {@link StateHandleProvider} to generate {@link FileStateHandle}s for the
+	 * given checkpoint directory.
+	 * 
+	 */
+	private static class FileStateHandleProvider implements StateHandleProvider<Serializable>
{
+
+		private static final long serialVersionUID = 3496670017955260518L;
+		private String path;
+
+		public FileStateHandleProvider(String path) {
+			this.path = path;
+		}
+
+		@Override
+		public FileStateHandle createStateHandle(Serializable state) {
+			return new FileStateHandle(state, path);
+		}
+
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index e8fe768..a53d8da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
 public class LocalStateHandle implements StateHandle<Serializable> {
 
 	private static final long serialVersionUID = 2093619217898039610L;
-	
+
 	private final Serializable state;
 
 	public LocalStateHandle(Serializable state) {
@@ -41,4 +41,19 @@ public class LocalStateHandle implements StateHandle<Serializable>
{
 	@Override
 	public void discardState() throws Exception {
 	}
+	
+	public static LocalStateHandleProvider createProvider(){
+		return new LocalStateHandleProvider();
+	}
+
+	private static class LocalStateHandleProvider implements StateHandleProvider<Serializable>
{
+
+		private static final long serialVersionUID = 4665419208932921425L;
+
+		@Override
+		public LocalStateHandle createStateHandle(Serializable state) {
+			return new LocalStateHandle(state);
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
new file mode 100644
index 0000000..bac490b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+
+/**
+ * Stateful streaming operators use a StateHandleProvider to create new
+ * {@link StateHandle}s to store each checkpoint in a persistent storage layer.
+ */
+public interface StateHandleProvider<T> extends Serializable {
+
+	/**
+	 * Creates a new {@link StateHandle} instance that will be used to store the
+	 * state checkpoint. This method is called for each state checkpoint saved.
+	 * 
+	 * @param state
+	 *            State to be stored in the handle.
+	 * 
+	 */
+	public StateHandle<T> createStateHandle(T state);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 820cfed..02c8dad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -40,6 +40,8 @@ import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
@@ -237,6 +239,19 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Sets the {@link StateHandleProvider} used for storing operator state
+	 * checkpoints when checkpointing is enabled.
+	 * <p>
+	 * An example would be using a {@link FileStateHandle#createProvider(Path)}
+	 * to use any Flink supported file system as a state backend
+	 * 
+	 */
+	public StreamExecutionEnvironment setStateHandleProvider(StateHandleProvider<?> provider)
{
+		streamGraph.setStateHandleProvider(provider);
+		return this;
+	}
+
+	/**
 	 * Sets the number of times that failed tasks are re-executed. A value of
 	 * zero effectively disables fault tolerance. A value of {@code -1}
 	 * indicates that the system default value (as defined in the configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index a1047df..3b00000 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -57,6 +58,7 @@ public class StreamConfig implements Serializable {
 	private static final String EDGES_IN_ORDER = "edgesInOrder";
 	private static final String OUT_STREAM_EDGES = "outStreamEdges";
 	private static final String IN_STREAM_EDGES = "inStreamEdges";
+	private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
 
 	// DEFAULT VALUES
 	private static final long DEFAULT_TIMEOUT = 100;
@@ -377,6 +379,25 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not instantiate configuration.", e);
 		}
 	}
+	
+	public void setStateHandleProvider(StateHandleProvider<?> provider) {
+
+		try {
+			InstantiationUtil.writeObjectToConfig(provider, this.config, STATEHANDLE_PROVIDER);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize stateHandle provider.", e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public <R> StateHandleProvider<R> getStateHandleProvider(ClassLoader cl) {
+		try {
+			return (StateHandleProvider<R>) InstantiationUtil
+					.readObjectFromConfig(this.config, STATEHANDLE_PROVIDER, cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate statehandle provider.", e);
+		}
+	}
 
 	public void setChainStart() {
 		config.setBoolean(IS_CHAINED_VERTEX, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 1ad1be0..aeba566 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -38,6 +38,8 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -76,6 +78,7 @@ public class StreamGraph extends StreamingPlan {
 
 	private Map<Integer, StreamLoop> streamLoops;
 	protected Map<Integer, StreamLoop> vertexIDtoLoop;
+	private StateHandleProvider<?> stateHandleProvider = LocalStateHandle.createProvider();
 
 	public StreamGraph(StreamExecutionEnvironment environment) {
 
@@ -116,6 +119,14 @@ public class StreamGraph extends StreamingPlan {
 		this.checkpointingInterval = checkpointingInterval;
 	}
 
+	public void setStateHandleProvider(StateHandleProvider<?> provider) {
+		this.stateHandleProvider = provider;
+	}
+
+	public StateHandleProvider<?> getStateHandleProvider() {
+		return this.stateHandleProvider;
+	}
+
 	public long getCheckpointingInterval() {
 		return checkpointingInterval;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 799862a..ef5ffca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -258,6 +258,7 @@ public class StreamingJobGraphGenerator {
 		config.setNonChainedOutputs(nonChainableOutputs);
 		config.setChainedOutputs(chainableOutputs);
 		config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
+		config.setStateHandleProvider(streamGraph.getStateHandleProvider());
 
 		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4d4893b..828a9a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
-import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -61,6 +61,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>>
extends Abs
 	protected StreamingRuntimeContext context;
 
 	protected ClassLoader userClassLoader;
+	
+	private StateHandleProvider<Serializable> stateHandleProvider;
 
 	private EventListener<TaskEvent> superstepListener;
 
@@ -74,6 +76,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>>
extends Abs
 		this.userClassLoader = getUserCodeClassLoader();
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.context = createRuntimeContext(getEnvironment().getTaskName());
+		this.stateHandleProvider = configuration.getStateHandleProvider(userClassLoader);
 
 		outputHandler = new OutputHandler<OUT>(this);
 
@@ -212,7 +215,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>>
extends Abs
 									: null;
 						}
 						
-						state = userState == null ? null : new LocalStateHandle(userState);
+						state = userState == null ? null : stateHandleProvider.createStateHandle(userState);
 					}
 					catch (Exception e) {
 						throw new Exception("Error while drawing snapshot of the user state.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 686fc23..5999625 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,19 +19,17 @@
 package org.apache.flink.streaming.api.scala
 
 import scala.reflect.ClassTag
-
 import com.esotericsoftware.kryo.Serializer
 import org.apache.commons.lang.Validate
 import org.joda.time.Instant
-
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
-
 import scala.reflect.ClassTag
+import org.apache.flink.runtime.state.StateHandleProvider
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
@@ -125,7 +123,16 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.enableCheckpointing()
     this
   }
-  
+
+  /**
+   * Sets the given StateHandleProvider to be used for storing operator state
+   * checkpoints when checkpointing is enabled.
+   */
+  def setStateHandleProvider(provider: StateHandleProvider[_]): StreamExecutionEnvironment
= {
+    javaEnv.setStateHandleProvider(provider)
+    this
+  }
+ 
   /**
    * Disables operator chaining for streaming operators. Operator chaining
    * allows non-shuffle operations to be co-located in the same thread fully

http://git-wip-us.apache.org/repos/asf/flink/blob/197cd6cf/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index 627016c..fb4b2b7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -18,16 +18,11 @@
 
 package org.apache.flink.test.recovery;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -40,7 +35,17 @@ import java.nio.charset.Charset;
 import java.util.HashSet;
 import java.util.UUID;
 
-import static org.junit.Assert.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
 /**
  * Test for streaming program behaviour in case of TaskManager failure
@@ -62,11 +67,16 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 
 	@Override
 	public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception {
-
+		
 		final File tempTestOutput = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH),
 												UUID.randomUUID().toString());
 
 		assertTrue("Cannot create directory for temp output", tempTestOutput.mkdirs());
+		
+		final File tempCheckpointDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH),
+				UUID.randomUUID().toString());
+
+		assertTrue("Cannot create directory for checkpoints", tempCheckpointDir.mkdirs());
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 									.createRemoteEnvironment("localhost", jobManagerPort);
@@ -74,6 +84,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		env.getConfig().disableSysoutLogging();
 		env.setNumberOfExecutionRetries(1);
 		env.enableCheckpointing(200);
+		env.setStateHandleProvider(FileStateHandle.createProvider(tempCheckpointDir.getAbsolutePath()));
 
 		DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir,
DATA_COUNT))
 				// add a non-chained no-op map to test the chain state restore logic
@@ -125,12 +136,19 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 
 			// validate
 			fileBatchHasEveryNumberLower(PARALLELISM, DATA_COUNT, tempTestOutput);
+			
+			// TODO: Figure out why this fails when ran with other tests
+			// Check whether checkpoints have been cleaned up properly
+			// assertDirectoryEmpty(tempCheckpointDir);
 		}
 		finally {
 			// clean up
 			if (tempTestOutput.exists()) {
 				FileUtils.deleteDirectory(tempTestOutput);
 			}
+			if (tempCheckpointDir.exists()) {
+				FileUtils.deleteDirectory(tempCheckpointDir);
+			}
 		}
 	}
 
@@ -155,7 +173,6 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		}
 
 		@Override
-		@SuppressWarnings("unchecked")
 		public void open(Configuration config) {
 			stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
 			congruence = getRuntimeContext().getIndexOfThisSubtask();
@@ -267,4 +284,10 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 			}
 		}
 	}
+	
+	private static void assertDirectoryEmpty(File path){
+		File[] files = path.listFiles();
+		assertNotNull(files);
+		assertEquals("Checkpoint dir is not empty", 0, files.length);
+	}
 }


Mime
View raw message