flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [02/16] flink git commit: [FLINK-3278] Add Partitioned State Backend Based on RocksDB
Date Wed, 03 Feb 2016 20:12:21 GMT
[FLINK-3278] Add Partitioned State Backend Based on RocksDB


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/524e56bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/524e56bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/524e56bc

Branch: refs/heads/master
Commit: 524e56bcbf9faf4b7363abfefeb3b3fa53949bad
Parents: 67ca4a4
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Jan 21 10:56:47 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 3 20:27:51 2016 +0100

----------------------------------------------------------------------
 .../flink-statebackend-rocksdb/pom.xml          |  71 ++++
 .../streaming/state/AbstractRocksDBState.java   | 372 +++++++++++++++++++
 .../streaming/state/RocksDBListState.java       | 183 +++++++++
 .../streaming/state/RocksDBReducingState.java   | 190 ++++++++++
 .../streaming/state/RocksDBStateBackend.java    | 127 +++++++
 .../streaming/state/RocksDBValueState.java      | 156 ++++++++
 .../state/RocksDBStateBackendTest.java          |  53 +++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/log4j.properties         |  27 ++
 .../src/test/resources/logback-test.xml         |  30 ++
 flink-contrib/pom.xml                           |   1 +
 .../flink/util/ExternalProcessRunner.java       | 233 ++++++++++++
 .../apache/flink/util/HDFSCopyFromLocal.java    |  48 +++
 .../org/apache/flink/util/HDFSCopyToLocal.java  |  49 +++
 .../flink/util/ExternalProcessRunnerTest.java   |  98 +++++
 flink-tests/pom.xml                             |   7 +
 .../EventTimeWindowCheckpointingITCase.java     |   9 +
 17 files changed, 1681 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml
new file mode 100644
index 0000000..999c496
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-contrib</artifactId>
+		<version>1.0-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+	<name>flink-statebackend-rocksdb</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.rocksdb</groupId>
+			<artifactId>rocksdbjni</artifactId>
+			<version>4.1.0</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
new file mode 100644
index 0000000..6dbe16c
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.util.HDFSCopyFromLocal;
+import org.apache.flink.util.HDFSCopyToLocal;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.rocksdb.BackupEngine;
+import org.rocksdb.BackupableDBOptions;
+import org.rocksdb.Env;
+import org.rocksdb.Options;
+import org.rocksdb.RestoreOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for {@link State} implementations that store state in a RocksDB database.
+ *
+ * <p>This base class is responsible for setting up the RocksDB database, for
+ * checkpointing/restoring the database and for disposal in the {@link #dispose()} method. The
+ * concrete subclasses just use the RocksDB handle to store/retrieve state.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <S> The type of {@link State}.
+ * @param <SD> The type of {@link StateDescriptor}.
+ * @param <Backend> The type of the backend that snapshots this key/value state.
+ */
+public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend>
+	implements KvState<K, N, S, SD, Backend>, State {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
+
+	/** Serializer for the keys */
+	protected final TypeSerializer<K> keySerializer;
+
+	/** Serializer for the namespace */
+	protected final TypeSerializer<N> namespaceSerializer;
+
+	/** The current key, which the next value methods will refer to */
+	protected K currentKey;
+
+	/** The current namespace, which the next value methods will refer to */
+	protected N currentNamespace;
+
+	/** Store it so that we can clean up in dispose() */
+	protected final File dbPath;
+
+	protected final String checkpointPath;
+
+	/** Our RocksDB instance */
+	protected final RocksDB db;
+
+	/**
+	 * Creates a new RocksDB backed state.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 */
+	protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		File dbPath,
+		String checkpointPath) {
+		this.keySerializer = requireNonNull(keySerializer);
+		this.namespaceSerializer = namespaceSerializer;
+		this.dbPath = dbPath;
+		this.checkpointPath = checkpointPath;
+
+		RocksDB.loadLibrary();
+
+		Options options = new Options().setCreateIfMissing(true);
+		options.setMergeOperator(new StringAppendOperator());
+
+		if (!dbPath.exists()) {
+			if (!dbPath.mkdirs()) {
+				throw new RuntimeException("Could not create RocksDB data directory.");
+			}
+		}
+
+		// clean it, this will remove the last part of the path but RocksDB will recreate it
+		try {
+			File db = new File(dbPath, "db");
+			LOG.warn("Deleting already existing db directory {}.", db);
+			FileUtils.deleteDirectory(db);
+		} catch (IOException e) {
+			throw new RuntimeException("Error cleaning RocksDB data directory.", e);
+		}
+
+		try {
+			db = RocksDB.open(options, new File(dbPath, "db").getAbsolutePath());
+		} catch (RocksDBException e) {
+			throw new RuntimeException("Error while opening RocksDB instance.", e);
+		}
+
+		options.dispose();
+
+	}
+
+	/**
+	 * Creates a new RocksDB backed state and restores from the given backup directory. After
+	 * restoring the backup directory is deleted.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 * @param restorePath The path to a backup directory from which to restore RocksDb database.
+	 */
+	protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		File dbPath,
+		String checkpointPath,
+		String restorePath) {
+
+		RocksDB.loadLibrary();
+
+		try {
+			BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(restorePath + "/"));
+			backupEngine.restoreDbFromLatestBackup(new File(dbPath, "db").getAbsolutePath(), new File(dbPath, "db").getAbsolutePath(), new RestoreOptions(true));
+			FileUtils.deleteDirectory(new File(restorePath));
+		} catch (RocksDBException|IOException|IllegalArgumentException e) {
+			throw new RuntimeException("Error while restoring RocksDB state from " + restorePath, e);
+		}
+
+		this.keySerializer = requireNonNull(keySerializer);
+		this.namespaceSerializer = namespaceSerializer;
+		this.dbPath = dbPath;
+		this.checkpointPath = checkpointPath;
+
+		Options options = new Options().setCreateIfMissing(true);
+		options.setMergeOperator(new StringAppendOperator());
+
+		if (!dbPath.exists()) {
+			if (!dbPath.mkdirs()) {
+				throw new RuntimeException("Could not create RocksDB data directory.");
+			}
+		}
+
+		try {
+			db = RocksDB.open(options, new File(dbPath, "db").getAbsolutePath());
+		} catch (RocksDBException e) {
+			throw new RuntimeException("Error while opening RocksDB instance.", e);
+		}
+
+		options.dispose();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	final public void clear() {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+		try {
+			writeKeyAndNamespace(out);
+			byte[] key = baos.toByteArray();
+			db.remove(key);
+		} catch (IOException|RocksDBException e) {
+			throw new RuntimeException("Error while removing entry from RocksDB", e);
+		}
+	}
+
+	protected void writeKeyAndNamespace(DataOutputView out) throws IOException {
+		keySerializer.serialize(currentKey, out);
+		out.writeByte(42);
+		namespaceSerializer.serialize(currentNamespace, out);
+	}
+
+	@Override
+	final public void setCurrentKey(K currentKey) {
+		this.currentKey = currentKey;
+	}
+
+	@Override
+	final public void setCurrentNamespace(N namespace) {
+		this.currentNamespace = namespace;
+	}
+
+	protected abstract KvStateSnapshot<K, N, S, SD, Backend> createRocksDBSnapshot(URI backupUri, long checkpointId);
+
+	@Override
+	final public KvStateSnapshot<K, N, S, SD, Backend> snapshot(
+		long checkpointId,
+		long timestamp) throws Exception {
+		boolean success = false;
+
+		final File localBackupPath = new File(dbPath, "backup-" + checkpointId);
+		final URI backupUri = new URI(checkpointPath + "/chk-" + checkpointId);
+
+		try {
+			if (!localBackupPath.exists()) {
+				if (!localBackupPath.mkdirs()) {
+					throw new RuntimeException("Could not create local backup path " + localBackupPath);
+				}
+			}
+
+			BackupEngine backupEngine = BackupEngine.open(Env.getDefault(),
+				new BackupableDBOptions(localBackupPath.getAbsolutePath()));
+
+			backupEngine.createNewBackup(db);
+
+			HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
+			KvStateSnapshot<K, N, S, SD, Backend> result = createRocksDBSnapshot(backupUri, checkpointId);
+			success = true;
+			return result;
+		} finally {
+			FileUtils.deleteDirectory(localBackupPath);
+			if (!success) {
+				FileSystem fs = FileSystem.get(backupUri, new Configuration());
+				fs.delete(new Path(backupUri), true);
+			}
+		}
+	}
+
+	@Override
+	final public void dispose() {
+		db.dispose();
+		try {
+			FileUtils.deleteDirectory(dbPath);
+		} catch (IOException e) {
+			throw new RuntimeException("Error disposing RocksDB data directory.", e);
+		}
+	}
+
+	public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> {
+		private static final long serialVersionUID = 1L;
+
+		private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
+
+		// ------------------------------------------------------------------------
+		//  Ctor parameters for RocksDB state
+		// ------------------------------------------------------------------------
+
+		/** Store it so that we can clean up in dispose() */
+		protected final File dbPath;
+
+		/** Where we should put RocksDB backups */
+		protected final String checkpointPath;
+
+		// ------------------------------------------------------------------------
+		//  Info about this checkpoint
+		// ------------------------------------------------------------------------
+
+		protected final URI backupUri;
+
+		protected long checkpointId;
+
+		// ------------------------------------------------------------------------
+		//  For sanity checks
+		// ------------------------------------------------------------------------
+
+		/** Key serializer */
+		protected final TypeSerializer<K> keySerializer;
+
+		/** Namespace serializer */
+		protected final TypeSerializer<N> namespaceSerializer;
+
+		/** Hash of the StateDescriptor, for sanity checks */
+		protected final SD stateDesc;
+
+		public AbstractRocksDBSnapshot(File dbPath,
+			String checkpointPath,
+			URI backupUri,
+			long checkpointId,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			SD stateDesc) {
+			this.dbPath = dbPath;
+			this.checkpointPath = checkpointPath;
+			this.backupUri = backupUri;
+			this.checkpointId = checkpointId;
+
+			this.stateDesc = stateDesc;
+			this.keySerializer = keySerializer;
+			this.namespaceSerializer = namespaceSerializer;
+		}
+
+		protected abstract KvState<K, N, S, SD, Backend> createRocksDBState(TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			SD stateDesc,
+			File dbPath,
+			String backupPath,
+			String restorePath) throws Exception;
+
+		@Override
+		public final KvState<K, N, S, SD, Backend> restoreState(
+			Backend stateBackend,
+			TypeSerializer<K> keySerializer,
+			ClassLoader classLoader,
+			long recoveryTimestamp) throws Exception {
+
+			// validity checks
+			if (!this.keySerializer.equals(keySerializer)) {
+				throw new IllegalArgumentException(
+					"Cannot restore the state from the snapshot with the given serializers. " +
+						"State (K/V) was serialized with " +
+						"(" + keySerializer + ") " +
+						"now is (" + keySerializer + ")");
+			}
+
+			if (!dbPath.exists()) {
+				if (!dbPath.mkdirs()) {
+					throw new RuntimeException("Could not create RocksDB base path " + dbPath);
+				}
+			}
+
+			FileSystem fs = FileSystem.get(backupUri, new Configuration());
+
+			final File localBackupPath = new File(dbPath, "chk-" + checkpointId);
+
+			if (localBackupPath.exists()) {
+				try {
+					LOG.warn("Deleting already existing local backup directory {}.", localBackupPath);
+					FileUtils.deleteDirectory(localBackupPath);
+				} catch (IOException e) {
+					throw new RuntimeException("Error cleaning RocksDB local backup directory.", e);
+				}
+			}
+
+			HDFSCopyToLocal.copyToLocal(backupUri, dbPath);
+			return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, localBackupPath.getAbsolutePath());
+		}
+
+		@Override
+		public final void discardState() throws Exception {
+			FileSystem fs = FileSystem.get(backupUri, new Configuration());
+			fs.delete(new Path(backupUri), true);
+		}
+
+		@Override
+		public final long getStateSize() throws Exception {
+			return 0;
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
new file mode 100644
index 0000000..e97e65d
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.rocksdb.RocksDBException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ListState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the values in the list state.
+ * @param <Backend> The type of the backend that snapshots this key/value state.
+ */
+public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
+	extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, Backend>
+	implements ListState<V> {
+
+	/** Serializer for the values */
+	private final TypeSerializer<V> valueSerializer;
+
+	/** This holds the name of the state and can create an initial default value for the state. */
+	protected final ListStateDescriptor<V> stateDesc;
+
+	/**
+	 * Creates a new {@code RocksDBListState}.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 */
+	protected RocksDBListState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ListStateDescriptor<V> stateDesc,
+		File dbPath,
+		String backupPath) {
+		super(keySerializer, namespaceSerializer, dbPath, backupPath);
+		this.stateDesc = requireNonNull(stateDesc);
+		this.valueSerializer = stateDesc.getSerializer();
+	}
+
+	/**
+	 * Creates a new {@code RocksDBListState}.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 */
+	protected RocksDBListState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ListStateDescriptor<V> stateDesc,
+		File dbPath,
+		String backupPath,
+		String restorePath) {
+		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
+		this.stateDesc = requireNonNull(stateDesc);
+		this.valueSerializer = stateDesc.getSerializer();
+	}
+
+	@Override
+	public Iterable<V> get() {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+		try {
+			writeKeyAndNamespace(out);
+			byte[] key = baos.toByteArray();
+			byte[] valueBytes = db.get(key);
+
+			if (valueBytes == null) {
+				return Collections.emptyList();
+			}
+
+			ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes);
+			DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+
+			List<V> result = new ArrayList<>();
+			while (in.available() > 0) {
+				result.add(valueSerializer.deserialize(in));
+				if (in.available() > 0) {
+					in.readByte();
+				}
+			}
+			return result;
+		} catch (IOException|RocksDBException e) {
+			throw new RuntimeException("Error while retrieving data from RocksDB", e);
+		}
+	}
+
+	@Override
+	public void add(V value) throws IOException {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+		try {
+			writeKeyAndNamespace(out);
+			byte[] key = baos.toByteArray();
+
+			baos.reset();
+
+			valueSerializer.serialize(value, out);
+			db.merge(key, baos.toByteArray());
+
+		} catch (Exception e) {
+			throw new RuntimeException("Error while adding data to RocksDB", e);
+		}
+	}
+
+	@Override
+	protected KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend> createRocksDBSnapshot(
+		URI backupUri,
+		long checkpointId) {
+		return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
+	}
+
+	private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(File dbPath,
+			String checkpointPath,
+			URI backupUri,
+			long checkpointId,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			ListStateDescriptor<V> stateDesc) {
+			super(dbPath,
+				checkpointPath,
+				backupUri,
+				checkpointId,
+				keySerializer,
+				namespaceSerializer,
+				stateDesc);
+		}
+
+		@Override
+		protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, Backend> createRocksDBState(
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			ListStateDescriptor<V> stateDesc,
+			File dbPath,
+			String backupPath,
+			String restorePath) throws Exception {
+			return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
new file mode 100644
index 0000000..eb21c3b
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.rocksdb.RocksDBException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ReducingState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of value that the state state stores.
+ * @param <Backend> The type of the backend that snapshots this key/value state.
+ */
+public class RocksDBReducingState<K, N, V, Backend extends AbstractStateBackend>
+	extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend>
+	implements ReducingState<V> {
+
+	/** Serializer for the values */
+	private final TypeSerializer<V> valueSerializer;
+
+	/** This holds the name of the state and can create an initial default value for the state. */
+	protected final ReducingStateDescriptor<V> stateDesc;
+
+	/** User-specified reduce function */
+	private final ReduceFunction<V> reduceFunction;
+
+	/**
+	 * Creates a new {@code RocksDBReducingState}.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 */
+	protected RocksDBReducingState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ReducingStateDescriptor<V> stateDesc,
+		File dbPath,
+		String backupPath) {
+		super(keySerializer, namespaceSerializer, dbPath, backupPath);
+		this.stateDesc = requireNonNull(stateDesc);
+		this.valueSerializer = stateDesc.getSerializer();
+		this.reduceFunction = stateDesc.getReduceFunction();
+	}
+
+	protected RocksDBReducingState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ReducingStateDescriptor<V> stateDesc,
+		File dbPath,
+		String backupPath,
+		String restorePath) {
+		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
+		this.stateDesc = stateDesc;
+		this.valueSerializer = stateDesc.getSerializer();
+		this.reduceFunction = stateDesc.getReduceFunction();
+	}
+
+	@Override
+	public V get() {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+		try {
+			writeKeyAndNamespace(out);
+			byte[] key = baos.toByteArray();
+			byte[] valueBytes = db.get(key);
+			if (valueBytes == null) {
+				return null;
+			}
+			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+		} catch (IOException|RocksDBException e) {
+			throw new RuntimeException("Error while retrieving data from RocksDB", e);
+		}
+	}
+
+	@Override
+	public void add(V value) throws IOException {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+		try {
+			writeKeyAndNamespace(out);
+			byte[] key = baos.toByteArray();
+			byte[] valueBytes = db.get(key);
+
+			if (valueBytes == null) {
+				baos.reset();
+				valueSerializer.serialize(value, out);
+				db.put(key, baos.toByteArray());
+			} else {
+				V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+				V newValue = reduceFunction.reduce(oldValue, value);
+				baos.reset();
+				valueSerializer.serialize(newValue, out);
+				db.put(key, baos.toByteArray());
+			}
+		} catch (Exception e) {
+			throw new RuntimeException("Error while adding data to RocksDB", e);
+		}
+	}
+
+	@Override
+	protected KvStateSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> createRocksDBSnapshot(
+		URI backupUri,
+		long checkpointId) {
+		return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
+	}
+
+	private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(File dbPath,
+			String checkpointPath,
+			URI backupUri,
+			long checkpointId,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			ReducingStateDescriptor<V> stateDesc) {
+			super(dbPath,
+				checkpointPath,
+				backupUri,
+				checkpointId,
+				keySerializer,
+				namespaceSerializer,
+				stateDesc);
+		}
+
+		@Override
+		protected KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, Backend> createRocksDBState(
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			ReducingStateDescriptor<V> stateDesc,
+			File dbPath,
+			String backupPath,
+			String restorePath) throws Exception {
+			return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
new file mode 100644
index 0000000..aaaeea4
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.File;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ */
+public class RocksDBStateBackend extends AbstractStateBackend {
+	private static final long serialVersionUID = 1L;
+
+	/** Base path for RocksDB directory. */
+	private final String dbBasePath;
+
+	/** The checkpoint directory that we snapshot RocksDB backups to. */
+	private final String checkpointDirectory;
+
+	/** Operator identifier that is used to uniqueify the RocksDB storage path. */
+	private String operatorIdentifier;
+
+	/** JobID for uniquifying backup paths. */
+	private JobID jobId;
+
+	private AbstractStateBackend backingStateBackend;
+
+	public RocksDBStateBackend(String dbBasePath, String checkpointDirectory, AbstractStateBackend backingStateBackend) {
+		this.dbBasePath = requireNonNull(dbBasePath);
+		this.checkpointDirectory = requireNonNull(checkpointDirectory);
+		this.backingStateBackend = requireNonNull(backingStateBackend);
+	}
+
+	@Override
+	public void initializeForJob(Environment env,
+		String operatorIdentifier,
+		TypeSerializer<?> keySerializer) throws Exception {
+		super.initializeForJob(env, operatorIdentifier, keySerializer);
+		this.operatorIdentifier = operatorIdentifier.replace(" ", "");
+		backingStateBackend.initializeForJob(env, operatorIdentifier, keySerializer);
+		this.jobId = env.getJobID();
+	}
+
+	@Override
+	public void disposeAllStateForCurrentJob() throws Exception {
+
+	}
+
+	@Override
+	public void close() throws Exception {
+
+	}
+
+	private File getDbPath(String stateName) {
+		return new File(new File(new File(new File(dbBasePath), jobId.toShortString()), operatorIdentifier), stateName);
+	}
+
+	private String getCheckpointPath(String stateName) {
+		return checkpointDirectory + "/" + jobId.toShortString() + "/" + operatorIdentifier + "/" + stateName;
+	}
+
+	@Override
+	protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
+		ValueStateDescriptor<T> stateDesc) throws Exception {
+		File dbPath = getDbPath(stateDesc.getName());
+		String checkpointPath = getCheckpointPath(stateDesc.getName());
+		return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath);
+	}
+
+	@Override
+	protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
+		ListStateDescriptor<T> stateDesc) throws Exception {
+		File dbPath = getDbPath(stateDesc.getName());
+		String checkpointPath = getCheckpointPath(stateDesc.getName());
+		return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath);
+	}
+
+	@Override
+	protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
+		ReducingStateDescriptor<T> stateDesc) throws Exception {
+		File dbPath = getDbPath(stateDesc.getName());
+		String checkpointPath = getCheckpointPath(stateDesc.getName());
+		return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath);
+	}
+
+	@Override
+	public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID,
+		long timestamp) throws Exception {
+		return backingStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);
+	}
+
+	@Override
+	public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state,
+		long checkpointID,
+		long timestamp) throws Exception {
+		return backingStateBackend.checkpointStateSerializable(state, checkpointID, timestamp);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
new file mode 100644
index 0000000..8767a86
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.rocksdb.RocksDBException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ValueState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of value that the state state stores.
+ * @param <Backend> The type of the backend that snapshots this key/value state.
+ */
+public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
+	extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend>
+	implements ValueState<V> {
+
+	/** Serializer for the values */
+	private final TypeSerializer<V> valueSerializer;
+
+	/** This holds the name of the state and can create an initial default value for the state. */
+	protected final ValueStateDescriptor<V> stateDesc;
+
+	/**
+	 * Creates a new {@code RocksDBReducingState}.
+	 *
+	 * @param keySerializer The serializer for the keys.
+	 * @param namespaceSerializer The serializer for the namespace.
+	 * @param stateDesc The state identifier for the state. This contains name
+	 *                           and can create a default state value.
+	 * @param dbPath The path on the local system where RocksDB data should be stored.
+	 */
+	protected RocksDBValueState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ValueStateDescriptor<V> stateDesc,
+		File dbPath,
+		String backupPath) {
+		super(keySerializer, namespaceSerializer, dbPath, backupPath);
+		this.stateDesc = requireNonNull(stateDesc);
+		this.valueSerializer = stateDesc.getSerializer();
+	}
+
+	protected RocksDBValueState(TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ValueStateDescriptor<V> stateDesc,
+		File dbPath,
+		String backupPath,
+		String restorePath) {
+		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
+		this.stateDesc = stateDesc;
+		this.valueSerializer = stateDesc.getSerializer();
+	}
+
+	@Override
+	public V value() {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+		try {
+			writeKeyAndNamespace(out);
+			byte[] key = baos.toByteArray();
+			byte[] valueBytes = db.get(key);
+			if (valueBytes == null) {
+				return stateDesc.getDefaultValue();
+			}
+			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+		} catch (IOException|RocksDBException e) {
+			throw new RuntimeException("Error while retrieving data from RocksDB", e);
+		}
+	}
+
+	@Override
+	public void update(V value) throws IOException {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+		try {
+			writeKeyAndNamespace(out);
+			byte[] key = baos.toByteArray();
+			baos.reset();
+			valueSerializer.serialize(value, out);
+			db.put(key, baos.toByteArray());
+		} catch (Exception e) {
+			throw new RuntimeException("Error while adding data to RocksDB", e);
+		}
+	}
+
+	@Override
+	protected KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> createRocksDBSnapshot(
+		URI backupUri,
+		long checkpointId) {
+		return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
+	}
+
+	private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> {
+		private static final long serialVersionUID = 1L;
+
+		public Snapshot(File dbPath,
+			String checkpointPath,
+			URI backupUri,
+			long checkpointId,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			ValueStateDescriptor<V> stateDesc) {
+			super(dbPath,
+				checkpointPath,
+				backupUri,
+				checkpointId,
+				keySerializer,
+				namespaceSerializer,
+				stateDesc);
+		}
+
+		@Override
+		protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, Backend> createRocksDBState(
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer,
+			ValueStateDescriptor<V> stateDesc,
+			File dbPath,
+			String backupPath,
+			String restorePath) throws Exception {
+			return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
new file mode 100644
index 0000000..3b3ac31
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Tests for the partitioned state part of {@link RocksDBStateBackend}.
+ */
+public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBackend> {
+
+	private File dbDir;
+	private File chkDir;
+
+	@Override
+	protected RocksDBStateBackend getStateBackend() throws IOException {
+		dbDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		chkDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+
+		return new RocksDBStateBackend(dbDir.getAbsolutePath(), "file://" + chkDir.getAbsolutePath(), new MemoryStateBackend());
+	}
+
+	@Override
+	protected void cleanup() {
+		try {
+			FileUtils.deleteDirectory(dbDir);
+			FileUtils.deleteDirectory(chkDir);
+		} catch (IOException ignore) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..0b686e5
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ed2bbcb
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=OFF, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..4f56748
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml
index 82b6211..76f0f88 100644
--- a/flink-contrib/pom.xml
+++ b/flink-contrib/pom.xml
@@ -43,5 +43,6 @@ under the License.
 		<module>flink-tweet-inputformat</module>
 		<module>flink-operator-stats</module>
 		<module>flink-connector-wikiedits</module>
+		<module>flink-statebackend-rocksdb</module>
 	</modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java b/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java
new file mode 100644
index 0000000..8e4725c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Utility class for running a class in an external process. This will try to find the java
+ * executable in common places and will use the classpath of the current process as the classpath
+ * of the new process.
+ *
+ * <p>Attention: The entry point class must be in the classpath of the currently running process,
+ * otherwise the newly spawned process will not find it and fail.
+ */
+public class ExternalProcessRunner {
+	private final String entryPointClassName;
+
+	private final Process process;
+
+	final StringWriter errorOutput = new StringWriter();
+
+	/**
+	 * Creates a new {@code ProcessRunner} that runs the given class with the given parameters.
+	 * The class must have a "main" method.
+	 */
+	public ExternalProcessRunner(String entryPointClassName, String[] parameters) throws IOException {
+		this.entryPointClassName = entryPointClassName;
+
+		String javaCommand = getJavaCommandPath();
+
+		List<String> commandList = new ArrayList<>();
+
+		commandList.add(javaCommand);
+		commandList.add("-classpath");
+		commandList.add(getCurrentClasspath());
+		commandList.add(entryPointClassName);
+
+		Collections.addAll(commandList, parameters);
+
+		process = new ProcessBuilder(commandList).start();
+
+		new PipeForwarder(process.getErrorStream(), errorOutput);
+	}
+
+	/**
+	 * Get the stderr stream of the process.
+	 */
+	public StringWriter getErrorOutput() {
+		return errorOutput;
+	}
+
+	/**
+	 * Start the external process, wait for it to finish and return the exit code of that process.
+	 *
+	 * <p>If this method is interrupted it will destroy the external process and forward the
+	 * {@code InterruptedException}.
+	 */
+	public int run() throws Exception {
+		try {
+			int returnCode = process.waitFor();
+
+			if (returnCode != 0) {
+				// determine whether we failed because of a ClassNotFoundException and forward that
+				if (getErrorOutput().toString().contains("Error: Could not find or load main class " + entryPointClassName)) {
+					throw new ClassNotFoundException("Error: Could not find or load main class " + entryPointClassName);
+				}
+
+			}
+			return returnCode;
+		} catch (InterruptedException e) {
+			try {
+				Class<?> processClass = process.getClass();
+				Method destroyForcibly = processClass.getMethod("destroyForcibly");
+				destroyForcibly.setAccessible(true);
+				destroyForcibly.invoke(process);
+			} catch (NoSuchMethodException ex) {
+				// we don't have destroyForcibly
+				process.destroy();
+			}
+			throw new InterruptedException("Interrupted while waiting for external process.");
+		}
+	}
+
+	/**
+	 * Tries to get the java executable command with which the current JVM was started.
+	 * Returns null, if the command could not be found.
+	 *
+	 * @return The java executable command.
+	 */
+	public static String getJavaCommandPath() {
+
+		try {
+			ProcessBuilder bld = new ProcessBuilder("java", "-version");
+			Process process = bld.start();
+			if (process.waitFor() == 0) {
+				return "java";
+			}
+		}
+		catch (Throwable t) {
+			// ignore and try the second path
+		}
+
+		try {
+			ProcessBuilder bld = new ProcessBuilder("java.exe", "-version");
+			Process process = bld.start();
+			if (process.waitFor() == 0) {
+				return "java.exe";
+			}
+		}
+		catch (Throwable t) {
+			// ignore and try the second path
+		}
+
+		File javaHome = new File(System.getProperty("java.home"));
+
+		String path1 = new File(javaHome, "java").getAbsolutePath();
+		String path2 = new File(new File(javaHome, "bin"), "java").getAbsolutePath();
+
+		try {
+			ProcessBuilder bld = new ProcessBuilder(path1, "-version");
+			Process process = bld.start();
+			if (process.waitFor() == 0) {
+				return path1;
+			}
+		}
+		catch (Throwable t) {
+			// ignore and try the second path
+		}
+
+		try {
+			ProcessBuilder bld = new ProcessBuilder(path2, "-version");
+			Process process = bld.start();
+			if (process.waitFor() == 0) {
+				return path2;
+			}
+		}
+		catch (Throwable tt) {
+			// no luck
+		}
+
+		String path3 = new File(javaHome, "java.exe").getAbsolutePath();
+		String path4 = new File(new File(javaHome, "bin"), "java.exe").getAbsolutePath();
+
+		try {
+			ProcessBuilder bld = new ProcessBuilder(path3, "-version");
+			Process process = bld.start();
+			if (process.waitFor() == 0) {
+				return path3;
+			}
+		}
+		catch (Throwable t) {
+			// ignore and try the second path
+		}
+
+		try {
+			ProcessBuilder bld = new ProcessBuilder(path4, "-version");
+			Process process = bld.start();
+			if (process.waitFor() == 0) {
+				return path4;
+			}
+		}
+		catch (Throwable tt) {
+			// no luck
+		}
+		return null;
+	}
+
+	/**
+	 * Gets the classpath with which the current JVM was started.
+	 *
+	 * @return The classpath with which the current JVM was started.
+	 */
+	public static String getCurrentClasspath() {
+		RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
+		return bean.getClassPath();
+	}
+
+	/**
+	 * Utility class to read the output of a process stream and forward it into a StringWriter.
+	 */
+	public static class PipeForwarder extends Thread {
+
+		private final StringWriter target;
+		private final InputStream source;
+
+		public PipeForwarder(InputStream source, StringWriter target) {
+			super("Pipe Forwarder");
+			setDaemon(true);
+
+			this.source = source;
+			this.target = target;
+
+			start();
+		}
+
+		@Override
+		public void run() {
+			try {
+				int next;
+				while ((next = source.read()) != -1) {
+					target.write(next);
+				}
+			}
+			catch (IOException e) {
+				// terminate
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java
new file mode 100644
index 0000000..cf6780b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.net.URI;
+
+/**
+ * Utility for copying from local file system to a HDFS {@link FileSystem} in an external process.
+ * This is required since {@code FileSystem.copyFromLocalFile} does not like being interrupted.
+ */
+public class HDFSCopyFromLocal {
+	public static void main(String[] args) throws Exception {
+		String localBackupPath = args[0];
+		String backupUri = args[1];
+
+		FileSystem fs = FileSystem.get(new URI(backupUri), new Configuration());
+
+		fs.copyFromLocalFile(new Path(localBackupPath), new Path(backupUri));
+	}
+
+	public static void copyFromLocal(File localPath, URI remotePath) throws Exception {
+		ExternalProcessRunner processRunner = new ExternalProcessRunner(HDFSCopyFromLocal.class.getName(),
+			new String[]{localPath.getAbsolutePath(), remotePath.toString()});
+		if (processRunner.run() != 0) {
+			throw new  RuntimeException("Error while copying to remote FileSystem: " + processRunner.getErrorOutput());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java
new file mode 100644
index 0000000..813f768
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.net.URI;
+
+/**
+ * Utility for copying from a HDFS {@link FileSystem} to the local file system in an external
+ * process. This is required since {@code FileSystem.copyToLocalFile} does not like being
+ * interrupted.
+ */
+public class HDFSCopyToLocal {
+	public static void main(String[] args) throws Exception {
+		String backupUri = args[0];
+		String dbPath = args[1];
+
+		FileSystem fs = FileSystem.get(new URI(backupUri), new Configuration());
+
+		fs.copyToLocalFile(new Path(backupUri), new Path(dbPath));
+	}
+
+	public static void copyToLocal(URI remotePath, File localPath) throws Exception {
+		ExternalProcessRunner processRunner = new ExternalProcessRunner(HDFSCopyToLocal.class.getName(),
+			new String[]{remotePath.toString(), localPath.getAbsolutePath()});
+		if (processRunner.run() != 0) {
+			throw new  RuntimeException("Error while copying from remote FileSystem: " + processRunner.getErrorOutput());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java b/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java
new file mode 100644
index 0000000..5ebe772
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ExternalProcessRunnerTest {
+
+	@Test(expected = ClassNotFoundException.class)
+	public void testClassNotFound() throws Exception {
+		ExternalProcessRunner runner = new ExternalProcessRunner("MyClassThatDoesNotExist", new String[]{});
+		runner.run();
+	}
+
+	@Test
+	public void testInterrupting() throws Exception {
+
+		final ExternalProcessRunner runner = new ExternalProcessRunner(InfiniteLoop.class.getName(), new String[]{});
+
+		Thread thread = new Thread() {
+			@Override
+			public void run() {
+				try {
+					runner.run();
+				} catch (InterruptedException e) {
+					// this is expected
+				} catch (Exception e) {
+					fail("Other exception received " + e);
+				}
+			}
+		};
+
+		thread.start();
+		thread.interrupt();
+		thread.join();
+	}
+
+	@Test
+	public void testPrintToErr() throws Exception {
+		final ExternalProcessRunner runner = new ExternalProcessRunner(PrintToError.class.getName(), new String[]{"hello42"});
+
+		int result = runner.run();
+
+		assertEquals(0, result);
+		assertEquals(runner.getErrorOutput().toString(), "Hello process hello42\n");
+	}
+
+	@Test
+	public void testFailing() throws Exception {
+		final ExternalProcessRunner runner = new ExternalProcessRunner(Failing.class.getName(), new String[]{});
+
+		int result = runner.run();
+
+		assertEquals(1, result);
+		// this needs to be adapted if the test changes because it contains the line number
+		assertEquals(runner.getErrorOutput().toString(), "Exception in thread \"main\" java.lang.RuntimeException: HEHE, I'm failing.\n" +
+			"\tat org.apache.flink.util.ExternalProcessRunnerTest$Failing.main(ExternalProcessRunnerTest.java:94)\n");
+	}
+
+
+	public static class InfiniteLoop {
+		public static void main(String[] args) {
+			while (true) {
+			}
+		}
+	}
+
+	public static class PrintToError {
+		public static void main(String[] args) {
+			System.err.println("Hello process " + args[0]);
+		}
+	}
+
+	public static class Failing {
+		public static void main(String[] args) {
+			throw new RuntimeException("HEHE, I'm failing.");
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 996f5e0..73a3d66 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -175,6 +175,13 @@ under the License.
 			<version>${guava.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 5886982..9bc0040 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -112,6 +113,12 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 				String backups = tempFolder.newFolder().getAbsolutePath();
 				this.stateBackend = new FsStateBackend("file://" + backups);
 				break;
+			case ROCKSDB:
+				String rocksDb = tempFolder.newFolder().getAbsolutePath();
+				String rocksDbBackups = tempFolder.newFolder().getAbsolutePath();
+
+				this.stateBackend = new RocksDBStateBackend(rocksDb, "file://" + rocksDbBackups, new MemoryStateBackend());
+				break;
 		}
 	}
 
@@ -739,6 +746,8 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		return Arrays.asList(new Object[][] {
 				{StateBackendEnum.MEM},
 				{StateBackendEnum.FILE},
+//				{StateBackendEnum.DB},
+				{StateBackendEnum.ROCKSDB}
 			}
 		);
 	}


Mime
View raw message