flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-3355] [rocksdb backend] Allow passing options to the RocksDB backend.
Date Tue, 09 Feb 2016 10:50:26 GMT
Repository: flink
Updated Branches:
  refs/heads/master 28c6254ee -> 9ee167949


[FLINK-3355] [rocksdb backend] Allow passing options to the RocksDB backend.

This also cleans up the generics in the RocksDB state classes.

This closes #1608


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ee16794
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ee16794
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ee16794

Branch: refs/heads/master
Commit: 9ee16794909d18aa84e8d0b738a6a447d11e6eeb
Parents: 28c6254
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 8 19:55:29 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Feb 9 11:03:09 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   | 113 +++++++++----------
 .../contrib/streaming/state/OptionsFactory.java |  31 +++++
 .../streaming/state/RocksDBListState.java       |  68 ++++++-----
 .../streaming/state/RocksDBReducingState.java   |  86 +++++++-------
 .../streaming/state/RocksDBStateBackend.java    |  76 +++++++++++--
 .../streaming/state/RocksDBValueState.java      |  74 ++++++------
 6 files changed, 273 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 783332c..05e15e8 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -1,36 +1,38 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.commons.io.FileUtils;
+
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.util.HDFSCopyFromLocal;
 import org.apache.flink.util.HDFSCopyToLocal;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+
 import org.rocksdb.BackupEngine;
 import org.rocksdb.BackupableDBOptions;
 import org.rocksdb.Env;
@@ -38,7 +40,7 @@ import org.rocksdb.Options;
 import org.rocksdb.RestoreOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.StringAppendOperator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,10 +62,9 @@ import static java.util.Objects.requireNonNull;
  * @param <N> The type of the namespace.
  * @param <S> The type of {@link State}.
  * @param <SD> The type of {@link StateDescriptor}.
- * @param <Backend> The type of the backend that snapshots this key/value state.
  */
-public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S,
?>, Backend extends AbstractStateBackend>
-	implements KvState<K, N, S, SD, Backend>, State {
+public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S,
?>>
+	implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
 
@@ -95,9 +96,11 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 	 * @param dbPath The path on the local system where RocksDB data should be stored.
 	 */
 	protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		File dbPath,
-		String checkpointPath) {
+			TypeSerializer<N> namespaceSerializer,
+			File dbPath,
+			String checkpointPath,
+			Options options) {
+		
 		this.keySerializer = requireNonNull(keySerializer);
 		this.namespaceSerializer = namespaceSerializer;
 		this.dbPath = dbPath;
@@ -105,9 +108,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 
 		RocksDB.loadLibrary();
 
-		Options options = new Options().setCreateIfMissing(true);
-		options.setMergeOperator(new StringAppendOperator());
-
 		if (!dbPath.exists()) {
 			if (!dbPath.mkdirs()) {
 				throw new RuntimeException("Could not create RocksDB data directory.");
@@ -128,9 +128,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 		} catch (RocksDBException e) {
 			throw new RuntimeException("Error while opening RocksDB instance.", e);
 		}
-
-		options.dispose();
-
 	}
 
 	/**
@@ -143,10 +140,11 @@ public abstract class AbstractRocksDBState<K, N, S extends State,
SD extends Sta
 	 * @param restorePath The path to a backup directory from which to restore RocksDb database.
 	 */
 	protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		File dbPath,
-		String checkpointPath,
-		String restorePath) {
+			TypeSerializer<N> namespaceSerializer,
+			File dbPath,
+			String checkpointPath,
+			String restorePath,
+			Options options) {
 
 		RocksDB.loadLibrary();
 
@@ -162,9 +160,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 		this.dbPath = dbPath;
 		this.checkpointPath = checkpointPath;
 
-		Options options = new Options().setCreateIfMissing(true);
-		options.setMergeOperator(new StringAppendOperator());
-
 		if (!dbPath.exists()) {
 			if (!dbPath.mkdirs()) {
 				throw new RuntimeException("Could not create RocksDB data directory.");
@@ -176,8 +171,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 		} catch (RocksDBException e) {
 			throw new RuntimeException("Error while opening RocksDB instance.", e);
 		}
-
-		options.dispose();
 	}
 
 	// ------------------------------------------------------------------------
@@ -211,12 +204,10 @@ public abstract class AbstractRocksDBState<K, N, S extends State,
SD extends Sta
 		this.currentNamespace = namespace;
 	}
 
-	protected abstract KvStateSnapshot<K, N, S, SD, Backend> createRocksDBSnapshot(URI
backupUri, long checkpointId);
+	protected abstract AbstractRocksDBSnapshot<K, N, S, SD> createRocksDBSnapshot(URI
backupUri, long checkpointId);
 
 	@Override
-	final public KvStateSnapshot<K, N, S, SD, Backend> snapshot(
-		long checkpointId,
-		long timestamp) throws Exception {
+	public final AbstractRocksDBSnapshot<K, N, S, SD> snapshot(long checkpointId, long
timestamp) throws Exception {
 		boolean success = false;
 
 		final File localBackupPath = new File(dbPath, "backup-" + checkpointId);
@@ -234,7 +225,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 			}
 
 			HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
-			KvStateSnapshot<K, N, S, SD, Backend> result = createRocksDBSnapshot(backupUri,
checkpointId);
+			AbstractRocksDBSnapshot<K, N, S, SD> result = createRocksDBSnapshot(backupUri, checkpointId);
 			success = true;
 			return result;
 		} finally {
@@ -256,7 +247,9 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 		}
 	}
 
-	public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends
StateDescriptor<S, ?>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K,
N, S, SD, Backend> {
+	public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends
StateDescriptor<S, ?>>
+			implements KvStateSnapshot<K, N, S, SD, RocksDBStateBackend>
+	{
 		private static final long serialVersionUID = 1L;
 
 		private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
@@ -293,12 +286,13 @@ public abstract class AbstractRocksDBState<K, N, S extends State,
SD extends Sta
 		protected final SD stateDesc;
 
 		public AbstractRocksDBSnapshot(File dbPath,
-			String checkpointPath,
-			URI backupUri,
-			long checkpointId,
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			SD stateDesc) {
+				String checkpointPath,
+				URI backupUri,
+				long checkpointId,
+				TypeSerializer<K> keySerializer,
+				TypeSerializer<N> namespaceSerializer,
+				SD stateDesc) {
+			
 			this.dbPath = dbPath;
 			this.checkpointPath = checkpointPath;
 			this.backupUri = backupUri;
@@ -309,19 +303,21 @@ public abstract class AbstractRocksDBState<K, N, S extends State,
SD extends Sta
 			this.namespaceSerializer = namespaceSerializer;
 		}
 
-		protected abstract KvState<K, N, S, SD, Backend> createRocksDBState(TypeSerializer<K>
keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			SD stateDesc,
-			File dbPath,
-			String backupPath,
-			String restorePath) throws Exception;
+		protected abstract KvState<K, N, S, SD, RocksDBStateBackend> createRocksDBState(
+				TypeSerializer<K> keySerializer,
+				TypeSerializer<N> namespaceSerializer,
+				SD stateDesc,
+				File dbPath,
+				String backupPath,
+				String restorePath,
+				Options options) throws Exception;
 
 		@Override
-		public final KvState<K, N, S, SD, Backend> restoreState(
-			Backend stateBackend,
-			TypeSerializer<K> keySerializer,
-			ClassLoader classLoader,
-			long recoveryTimestamp) throws Exception {
+		public final KvState<K, N, S, SD, RocksDBStateBackend> restoreState(
+				RocksDBStateBackend stateBackend,
+				TypeSerializer<K> keySerializer,
+				ClassLoader classLoader,
+				long recoveryTimestamp) throws Exception {
 
 			// validity checks
 			if (!this.keySerializer.equals(keySerializer)) {
@@ -352,7 +348,8 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 			}
 
 			HDFSCopyToLocal.copyToLocal(backupUri, dbPath);
-			return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath,
localBackupPath.getAbsolutePath());
+			return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath, 
+					checkpointPath, localBackupPath.getAbsolutePath(), stateBackend.getRocksDBOptions());
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
new file mode 100644
index 0000000..73b1e5d
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.rocksdb.Options;
+
+/**
+ * A factory for {@link Options} to be passed to the {@link RocksDBStateBackend}.
+ * Options have to be created lazily by this factory, because the {@code Options}
+ * class is not serializable and holds pointers to native code.
+ */
+public interface OptionsFactory extends java.io.Serializable {
+	
+	Options createOptions();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index e97e65d..da07f75 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.state.ListState;
@@ -22,9 +23,9 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
 
 import java.io.ByteArrayInputStream;
@@ -44,10 +45,9 @@ import static java.util.Objects.requireNonNull;
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of the values in the list state.
- * @param <Backend> The type of the backend that snapshots this key/value state.
  */
-public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
-	extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>,
Backend>
+public class RocksDBListState<K, N, V>
+	extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>>
 	implements ListState<V> {
 
 	/** Serializer for the values */
@@ -66,11 +66,13 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
 	 * @param dbPath The path on the local system where RocksDB data should be stored.
 	 */
 	protected RocksDBListState(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		ListStateDescriptor<V> stateDesc,
-		File dbPath,
-		String backupPath) {
-		super(keySerializer, namespaceSerializer, dbPath, backupPath);
+			TypeSerializer<N> namespaceSerializer,
+			ListStateDescriptor<V> stateDesc,
+			File dbPath,
+			String backupPath,
+			Options options) {
+		
+		super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
 	}
@@ -85,12 +87,14 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
 	 * @param dbPath The path on the local system where RocksDB data should be stored.
 	 */
 	protected RocksDBListState(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		ListStateDescriptor<V> stateDesc,
-		File dbPath,
-		String backupPath,
-		String restorePath) {
-		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
+			TypeSerializer<N> namespaceSerializer,
+			ListStateDescriptor<V> stateDesc,
+			File dbPath,
+			String backupPath,
+			String restorePath,
+			Options options) {
+		
+		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
 	}
@@ -143,13 +147,16 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
 	}
 
 	@Override
-	protected KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend>
createRocksDBSnapshot(
-		URI backupUri,
-		long checkpointId) {
+	protected AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>>
createRocksDBSnapshot(
+			URI backupUri,
+			long checkpointId) {
+		
 		return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer,
namespaceSerializer, stateDesc);
 	}
 
-	private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends
AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend>
{
+	private static class Snapshot<K, N, V> extends 
+			AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>>
+	{
 		private static final long serialVersionUID = 1L;
 
 		public Snapshot(File dbPath,
@@ -169,14 +176,17 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
 		}
 
 		@Override
-		protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, Backend>
createRocksDBState(
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<V> stateDesc,
-			File dbPath,
-			String backupPath,
-			String restorePath) throws Exception {
-			return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,
checkpointPath, restorePath);
+		protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, RocksDBStateBackend>
createRocksDBState(
+				TypeSerializer<K> keySerializer,
+				TypeSerializer<N> namespaceSerializer,
+				ListStateDescriptor<V> stateDesc,
+				File dbPath,
+				String backupPath,
+				String restorePath,
+				Options options) throws Exception {
+			
+			return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,

+					checkpointPath, restorePath, options);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index eb21c3b..81f9ffb 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -1,22 +1,3 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.streaming.state;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -35,15 +16,17 @@ package org.apache.flink.contrib.streaming.state;
  * limitations under the License.
  */
 
+package org.apache.flink.contrib.streaming.state;
+
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
 
 import java.io.ByteArrayInputStream;
@@ -60,10 +43,9 @@ import static java.util.Objects.requireNonNull;
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of value that the state state stores.
- * @param <Backend> The type of the backend that snapshots this key/value state.
  */
-public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend>
-	extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>,
Backend>
+public class RocksDBReducingState<K, N, V>
+	extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>>
 	implements ReducingState<V> {
 
 	/** Serializer for the values */
@@ -85,23 +67,27 @@ public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend>
 	 * @param dbPath The path on the local system where RocksDB data should be stored.
 	 */
 	protected RocksDBReducingState(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		ReducingStateDescriptor<V> stateDesc,
-		File dbPath,
-		String backupPath) {
-		super(keySerializer, namespaceSerializer, dbPath, backupPath);
+			TypeSerializer<N> namespaceSerializer,
+			ReducingStateDescriptor<V> stateDesc,
+			File dbPath,
+			String backupPath,
+			Options options) {
+		
+		super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
 		this.reduceFunction = stateDesc.getReduceFunction();
 	}
 
 	protected RocksDBReducingState(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		ReducingStateDescriptor<V> stateDesc,
-		File dbPath,
-		String backupPath,
-		String restorePath) {
-		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
+			TypeSerializer<N> namespaceSerializer,
+			ReducingStateDescriptor<V> stateDesc,
+			File dbPath,
+			String backupPath,
+			String restorePath,
+			Options options) {
+		
+		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
 		this.stateDesc = stateDesc;
 		this.valueSerializer = stateDesc.getSerializer();
 		this.reduceFunction = stateDesc.getReduceFunction();
@@ -150,13 +136,16 @@ public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend>
 	}
 
 	@Override
-	protected KvStateSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>,
Backend> createRocksDBSnapshot(
-		URI backupUri,
-		long checkpointId) {
+	protected AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>>
createRocksDBSnapshot(
+			URI backupUri,
+			long checkpointId) {
+		
 		return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer,
namespaceSerializer, stateDesc);
 	}
 
-	private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends
AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>,
Backend> {
+	private static class Snapshot<K, N, V> extends 
+			AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>>
+	{
 		private static final long serialVersionUID = 1L;
 
 		public Snapshot(File dbPath,
@@ -176,14 +165,17 @@ public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend>
 		}
 
 		@Override
-		protected KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend>
createRocksDBState(
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			ReducingStateDescriptor<V> stateDesc,
-			File dbPath,
-			String backupPath,
-			String restorePath) throws Exception {
-			return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc,
dbPath, checkpointPath, restorePath);
+		protected KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, RocksDBStateBackend>
createRocksDBState(
+				TypeSerializer<K> keySerializer,
+				TypeSerializer<N> namespaceSerializer,
+				ReducingStateDescriptor<V> stateDesc,
+				File dbPath,
+				String backupPath,
+				String restorePath,
+				Options options) throws Exception {
+			
+			return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc,

+					dbPath, checkpointPath, restorePath, options);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index eefa4a9..8c0171a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *	http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -31,11 +31,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.api.common.state.StateBackend;
+import org.rocksdb.Options;
+import org.rocksdb.StringAppendOperator;
 
 import static java.util.Objects.requireNonNull;
 
 /**
- *
+ * A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can
+ * store very large state that exceeds memory and spills to disk.
+ * 
+ * <p>All key/value state (including windows) is stored in the key/value index of RocksDB.
+ * For persistence against loss of machines, checkpoints take a snapshot of the
+ * RocksDB database, and persist that snapshot in a file system (by default) or
+ * another configurable state backend.
  */
 public class RocksDBStateBackend extends AbstractStateBackend {
 	private static final long serialVersionUID = 1L;
@@ -53,6 +62,13 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	private JobID jobId;
 
 	private AbstractStateBackend backingStateBackend;
+	
+	/** The options factory to create the RocksDB options in the cluster */
+	private OptionsFactory optionsFactory;
+	
+	/** The options from the options factory, cached */
+	private transient Options rocksDbOptions;
+	
 
 	public RocksDBStateBackend(String dbBasePath, String checkpointDirectory, AbstractStateBackend
backingStateBackend) {
 		this.dbBasePath = requireNonNull(dbBasePath);
@@ -71,13 +87,15 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public void disposeAllStateForCurrentJob() throws Exception {
-
-	}
+	public void disposeAllStateForCurrentJob() throws Exception {}
 
 	@Override
 	public void close() throws Exception {
-
+		Options opt = this.rocksDbOptions;
+		if (opt != null) {
+			opt.dispose();
+			this.rocksDbOptions = null;
+		}
 	}
 
 	private File getDbPath(String stateName) {
@@ -93,7 +111,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		ValueStateDescriptor<T> stateDesc) throws Exception {
 		File dbPath = getDbPath(stateDesc.getName());
 		String checkpointPath = getCheckpointPath(stateDesc.getName());
-		return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,
checkpointPath);
+		
+		return new RocksDBValueState<>(keySerializer, namespaceSerializer, 
+				stateDesc, dbPath, checkpointPath, getRocksDBOptions());
 	}
 
 	@Override
@@ -101,7 +121,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		ListStateDescriptor<T> stateDesc) throws Exception {
 		File dbPath = getDbPath(stateDesc.getName());
 		String checkpointPath = getCheckpointPath(stateDesc.getName());
-		return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,
checkpointPath);
+		
+		return new RocksDBListState<>(keySerializer, namespaceSerializer, 
+				stateDesc, dbPath, checkpointPath, getRocksDBOptions());
 	}
 
 	@Override
@@ -109,7 +131,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		ReducingStateDescriptor<T> stateDesc) throws Exception {
 		File dbPath = getDbPath(stateDesc.getName());
 		String checkpointPath = getCheckpointPath(stateDesc.getName());
-		return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc,
dbPath, checkpointPath);
+		
+		return new RocksDBReducingState<>(keySerializer, namespaceSerializer, 
+				stateDesc, dbPath, checkpointPath, getRocksDBOptions());
 	}
 
 	@Override
@@ -124,4 +148,38 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		long timestamp) throws Exception {
 		return backingStateBackend.checkpointStateSerializable(state, checkpointID, timestamp);
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Parametrize with Options
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Defines the {@link org.rocksdb.Options} for the RocksDB instances.
+	 * Because the options are not serializable and hold native code references,
+	 * they must be specified through a factory. 
+	 * 
+	 * @param optionsFactory The options factory that lazily creates the RocksDB options.
+	 */
+	public void setOptions(OptionsFactory optionsFactory) {
+		this.optionsFactory = optionsFactory;
+	}
+
+	/**
+	 * Gets the options factory that lazily creates the RocksDB options.
+	 * 
+	 * @return The options factory.
+	 */
+	public OptionsFactory getOptions() {
+		return optionsFactory;
+	}
+	
+	Options getRocksDBOptions() {
+		if (rocksDbOptions == null) {
+			Options opt = optionsFactory == null ? new Options() : optionsFactory.createOptions();
+			opt = opt.setCreateIfMissing(true);
+			opt = opt.setMergeOperator(new StringAppendOperator());
+			rocksDbOptions = opt;
+		}
+		return rocksDbOptions;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index f51e160..388f099 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,15 +6,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.state.ValueState;
@@ -22,9 +23,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
 
 import java.io.ByteArrayInputStream;
@@ -41,10 +42,9 @@ import static java.util.Objects.requireNonNull;
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of value that the state state stores.
- * @param <Backend> The type of the backend that snapshots this key/value state.
  */
-public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
-	extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>,
Backend>
+public class RocksDBValueState<K, N, V>
+	extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>>
 	implements ValueState<V> {
 
 	/** Serializer for the values */
@@ -63,22 +63,26 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
 	 * @param dbPath The path on the local system where RocksDB data should be stored.
 	 */
 	protected RocksDBValueState(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		ValueStateDescriptor<V> stateDesc,
-		File dbPath,
-		String backupPath) {
-		super(keySerializer, namespaceSerializer, dbPath, backupPath);
+			TypeSerializer<N> namespaceSerializer,
+			ValueStateDescriptor<V> stateDesc,
+			File dbPath,
+			String backupPath,
+			Options options) {
+		
+		super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
 	}
 
 	protected RocksDBValueState(TypeSerializer<K> keySerializer,
-		TypeSerializer<N> namespaceSerializer,
-		ValueStateDescriptor<V> stateDesc,
-		File dbPath,
-		String backupPath,
-		String restorePath) {
-		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
+			TypeSerializer<N> namespaceSerializer,
+			ValueStateDescriptor<V> stateDesc,
+			File dbPath,
+			String backupPath,
+			String restorePath,
+			Options options) {
+		
+		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
 		this.stateDesc = stateDesc;
 		this.valueSerializer = stateDesc.getSerializer();
 	}
@@ -120,13 +124,16 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
 	}
 
 	@Override
-	protected KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend>
createRocksDBSnapshot(
-		URI backupUri,
-		long checkpointId) {
+	protected AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>>
createRocksDBSnapshot(
+			URI backupUri,
+			long checkpointId) {
+		
 		return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer,
namespaceSerializer, stateDesc);
 	}
 
-	private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends
AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend>
{
+	private static class Snapshot<K, N, V> 
+			extends AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>>
+	{
 		private static final long serialVersionUID = 1L;
 
 		public Snapshot(File dbPath,
@@ -146,14 +153,17 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
 		}
 
 		@Override
-		protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend>
createRocksDBState(
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			ValueStateDescriptor<V> stateDesc,
-			File dbPath,
-			String backupPath,
-			String restorePath) throws Exception {
-			return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,
checkpointPath, restorePath);
+		protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, RocksDBStateBackend>
createRocksDBState(
+				TypeSerializer<K> keySerializer,
+				TypeSerializer<N> namespaceSerializer,
+				ValueStateDescriptor<V> stateDesc,
+				File dbPath,
+				String backupPath,
+				String restorePath,
+				Options options) throws Exception {
+			
+			return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,

+					checkpointPath, restorePath, options);
 		}
 	}
 }


Mime
View raw message