flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3654] Disable Write-Ahead-Log in RocksDB State
Date Thu, 07 Apr 2016 10:18:48 GMT
Repository: flink
Updated Branches:
  refs/heads/master e5b93da0e -> 3fe53d39f


[FLINK-3654] Disable Write-Ahead-Log in RocksDB State


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fe53d39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fe53d39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fe53d39

Branch: refs/heads/master
Commit: 3fe53d39fe452aecaa7b075b94b0ca33a1d71864
Parents: e5b93da
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Mar 22 18:08:15 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Apr 7 12:16:56 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java      | 12 +++++++++++-
 .../streaming/state/RocksDBFoldingState.java       | 17 +++++++++++++++--
 .../contrib/streaming/state/RocksDBListState.java  | 15 ++++++++++++++-
 .../streaming/state/RocksDBReducingState.java      | 17 +++++++++++++++--
 .../contrib/streaming/state/RocksDBValueState.java | 15 ++++++++++++++-
 5 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fe53d39/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index ba1b7dc..74e0509 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.rocksdb.BackupEngine;
 import org.rocksdb.BackupableDBOptions;
 import org.rocksdb.Env;
+import org.rocksdb.FlushOptions;
 import org.rocksdb.Options;
 import org.rocksdb.RestoreOptions;
 import org.rocksdb.RocksDB;
@@ -143,6 +144,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 		} catch (RocksDBException e) {
 			throw new RuntimeException("Error while opening RocksDB instance.", e);
 		}
+
 	}
 
 	/**
@@ -253,7 +255,15 @@ public abstract class AbstractRocksDBState<K, N, S extends State,
SD extends Sta
 		}
 
 		long startTime = System.currentTimeMillis();
-		try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(localBackupPath.getAbsolutePath())))
{
+		BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());
+		// we disabled the WAL
+		backupOptions.setBackupLogFiles(false);
+		// no need to sync since we use the backup only as intermediate data before writing to
FileSystem snapshot
+		backupOptions.setSync(false);
+		try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(),
+				backupOptions)) {
+			// make sure to flush because we don't write to the write-ahead-log
+			db.flush(new FlushOptions().setWaitForFlush(true));
 			backupEngine.createNewBackup(db);
 		}
 		long endTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe53d39/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 1a3e69e..20b5181 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.KvState;
 
 import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -59,6 +60,12 @@ public class RocksDBFoldingState<K, N, T, ACC>
 	private final FoldFunction<T, ACC> foldFunction;
 
 	/**
+	 * We disable writes to the write-ahead-log here. We can't have these in the base class
+	 * because JNI segfaults for some reason if they are.
+	 */
+	protected final WriteOptions writeOptions;
+
+	/**
 	 * Creates a new {@code RocksDBFoldingState}.
 	 *
 	 * @param keySerializer The serializer for the keys.
@@ -81,6 +88,9 @@ public class RocksDBFoldingState<K, N, T, ACC>
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
 		this.foldFunction = stateDesc.getFoldFunction();
+
+		writeOptions = new WriteOptions();
+		writeOptions.setDisableWAL(true);
 	}
 
 	/**
@@ -108,6 +118,9 @@ public class RocksDBFoldingState<K, N, T, ACC>
 		this.stateDesc = stateDesc;
 		this.valueSerializer = stateDesc.getSerializer();
 		this.foldFunction = stateDesc.getFoldFunction();
+
+		writeOptions = new WriteOptions();
+		writeOptions.setDisableWAL(true);
 	}
 
 	@Override
@@ -139,13 +152,13 @@ public class RocksDBFoldingState<K, N, T, ACC>
 			if (valueBytes == null) {
 				baos.reset();
 				valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
-				db.put(key, baos.toByteArray());
+				db.put(writeOptions, key, baos.toByteArray());
 			} else {
 				ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
 				ACC newValue = foldFunction.fold(oldValue, value);
 				baos.reset();
 				valueSerializer.serialize(newValue, out);
-				db.put(key, baos.toByteArray());
+				db.put(writeOptions, key, baos.toByteArray());
 			}
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe53d39/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 53cde46..f5c589c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.KvState;
 
 import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -57,6 +58,12 @@ public class RocksDBListState<K, N, V>
 	protected final ListStateDescriptor<V> stateDesc;
 
 	/**
+	 * We disable writes to the write-ahead-log here. We can't have these in the base class
+	 * because JNI segfaults for some reason if they are.
+	 */
+	protected final WriteOptions writeOptions;
+
+	/**
 	 * Creates a new {@code RocksDBListState}.
 	 *
 	 * @param keySerializer The serializer for the keys.
@@ -76,6 +83,9 @@ public class RocksDBListState<K, N, V>
 		super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
+
+		writeOptions = new WriteOptions();
+		writeOptions.setDisableWAL(true);
 	}
 
 	/**
@@ -100,6 +110,9 @@ public class RocksDBListState<K, N, V>
 		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
+
+		writeOptions = new WriteOptions();
+		writeOptions.setDisableWAL(true);
 	}
 
 	@Override
@@ -142,7 +155,7 @@ public class RocksDBListState<K, N, V>
 			baos.reset();
 
 			valueSerializer.serialize(value, out);
-			db.merge(key, baos.toByteArray());
+			db.merge(writeOptions, key, baos.toByteArray());
 
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe53d39/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index d1444eb..6a965cc 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.KvState;
 
 import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -58,6 +59,12 @@ public class RocksDBReducingState<K, N, V>
 	private final ReduceFunction<V> reduceFunction;
 
 	/**
+	 * We disable writes to the write-ahead-log here. We can't have these in the base class
+	 * because JNI segfaults for some reason if they are.
+	 */
+	protected final WriteOptions writeOptions;
+
+	/**
 	 * Creates a new {@code RocksDBReducingState}.
 	 *
 	 * @param keySerializer The serializer for the keys.
@@ -78,6 +85,9 @@ public class RocksDBReducingState<K, N, V>
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
 		this.reduceFunction = stateDesc.getReduceFunction();
+
+		writeOptions = new WriteOptions();
+		writeOptions.setDisableWAL(true);
 	}
 
 	/**
@@ -103,6 +113,9 @@ public class RocksDBReducingState<K, N, V>
 		this.stateDesc = stateDesc;
 		this.valueSerializer = stateDesc.getSerializer();
 		this.reduceFunction = stateDesc.getReduceFunction();
+
+		writeOptions = new WriteOptions();
+		writeOptions.setDisableWAL(true);
 	}
 
 	@Override
@@ -134,13 +147,13 @@ public class RocksDBReducingState<K, N, V>
 			if (valueBytes == null) {
 				baos.reset();
 				valueSerializer.serialize(value, out);
-				db.put(key, baos.toByteArray());
+				db.put(writeOptions, key, baos.toByteArray());
 			} else {
 				V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
 				V newValue = reduceFunction.reduce(oldValue, value);
 				baos.reset();
 				valueSerializer.serialize(newValue, out);
-				db.put(key, baos.toByteArray());
+				db.put(writeOptions, key, baos.toByteArray());
 			}
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fe53d39/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 39a0e83..15b460a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.KvState;
 
 import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -54,6 +55,12 @@ public class RocksDBValueState<K, N, V>
 	protected final ValueStateDescriptor<V> stateDesc;
 
 	/**
+	 * We disable writes to the write-ahead-log here. We can't have these in the base class
+	 * because JNI segfaults for some reason if they are.
+	 */
+	protected final WriteOptions writeOptions;
+
+	/**
 	 * Creates a new {@code RocksDBValueState}.
 	 *
 	 * @param keySerializer The serializer for the keys.
@@ -73,6 +80,9 @@ public class RocksDBValueState<K, N, V>
 		super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
+
+		writeOptions = new WriteOptions();
+		writeOptions.setDisableWAL(true);
 	}
 
 	/**
@@ -97,6 +107,9 @@ public class RocksDBValueState<K, N, V>
 		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
 		this.stateDesc = stateDesc;
 		this.valueSerializer = stateDesc.getSerializer();
+
+		writeOptions = new WriteOptions();
+		writeOptions.setDisableWAL(true);
 	}
 
 	@Override
@@ -129,7 +142,7 @@ public class RocksDBValueState<K, N, V>
 			byte[] key = baos.toByteArray();
 			baos.reset();
 			valueSerializer.serialize(value, out);
-			db.put(key, baos.toByteArray());
+			db.put(writeOptions, key, baos.toByteArray());
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);
 		}


Mime
View raw message