flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [3/3] flink git commit: [FLINK-7220] [checkpoints] Update RocksDB dependency to 5.5.5
Date Fri, 28 Jul 2017 11:58:34 GMT
[FLINK-7220] [checkpoints] Update RocksDB dependency to 5.5.5


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d818fc48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d818fc48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d818fc48

Branch: refs/heads/master
Commit: d818fc48fbc9bfbd613aecdaa0cefcf2c6622289
Parents: e975140
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Mon May 29 11:11:41 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Fri Jul 28 13:57:58 2017 +0200

----------------------------------------------------------------------
 .../flink-statebackend-rocksdb/pom.xml          |  6 +--
 .../streaming/state/PredefinedOptions.java      | 20 ++++----
 .../state/RocksDBKeyedStateBackend.java         | 51 ++++++++++++++------
 .../state/RocksDBStateBackendConfigTest.java    | 11 +----
 .../state/benchmark/RocksDBPerformanceTest.java |  9 ++--
 5 files changed, 53 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index 527ca18..fa97e07 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -55,9 +55,9 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.data-artisans</groupId>
-			<artifactId>frocksdbjni</artifactId>
-			<version>4.11.2-artisans</version>
+			<groupId>org.rocksdb</groupId>
+			<artifactId>rocksdbjni</artifactId>
+			<version>5.5.5</version>
 		</dependency>
 
 		<!-- test dependencies -->

http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index f606131..cb47ce4 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -19,10 +19,10 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.DBOptions;
-import org.rocksdb.StringAppendOperator;
 
 /**
  * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}.
@@ -46,14 +46,13 @@ public enum PredefinedOptions {
 		@Override
 		public DBOptions createDBOptions() {
 			return new DBOptions()
-					.setUseFsync(false)
-					.setDisableDataSync(true);
+					.setUseFsync(false);
 		}
 
 		@Override
 		public ColumnFamilyOptions createColumnOptions() {
 			return new ColumnFamilyOptions()
-					.setMergeOperator(new StringAppendOperator());
+					.setMergeOperatorName(MERGE_OPERATOR_NAME);
 		}
 
 	},
@@ -86,14 +85,13 @@ public enum PredefinedOptions {
 			return new DBOptions()
 					.setIncreaseParallelism(4)
 					.setUseFsync(false)
-					.setDisableDataSync(true)
 					.setMaxOpenFiles(-1);
 		}
 
 		@Override
 		public ColumnFamilyOptions createColumnOptions() {
 			return new ColumnFamilyOptions()
-					.setMergeOperator(new StringAppendOperator())
+					.setMergeOperatorName(MERGE_OPERATOR_NAME)
 					.setCompactionStyle(CompactionStyle.LEVEL)
 					.setLevelCompactionDynamicLevelBytes(true);
 		}
@@ -133,7 +131,6 @@ public enum PredefinedOptions {
 			return new DBOptions()
 					.setIncreaseParallelism(4)
 					.setUseFsync(false)
-					.setDisableDataSync(true)
 					.setMaxOpenFiles(-1);
 		}
 
@@ -146,7 +143,7 @@ public enum PredefinedOptions {
 			final long writeBufferSize = 64 * 1024 * 1024;
 
 			return new ColumnFamilyOptions()
-					.setMergeOperator(new StringAppendOperator())
+					.setMergeOperatorName(MERGE_OPERATOR_NAME)
 					.setCompactionStyle(CompactionStyle.LEVEL)
 					.setLevelCompactionDynamicLevelBytes(true)
 					.setTargetFileSizeBase(targetFileSize)
@@ -158,6 +155,7 @@ public enum PredefinedOptions {
 							new BlockBasedTableConfig()
 									.setBlockCacheSize(blockCacheSize)
 									.setBlockSize(blockSize)
+									.setFilter(new BloomFilter())
 					);
 		}
 	},
@@ -186,19 +184,21 @@ public enum PredefinedOptions {
 			return new DBOptions()
 					.setIncreaseParallelism(4)
 					.setUseFsync(false)
-					.setDisableDataSync(true)
 					.setMaxOpenFiles(-1);
 		}
 
 		@Override
 		public ColumnFamilyOptions createColumnOptions() {
 			return new ColumnFamilyOptions()
-					.setMergeOperator(new StringAppendOperator());
+					.setMergeOperatorName(MERGE_OPERATOR_NAME);
 		}
 	};
 
 	// ------------------------------------------------------------------------
 
+	// The name of the merge operator in RocksDB. Do not change except you know exactly what
you do.
+	public static final String MERGE_OPERATOR_NAME = "stringappendtest";
+
 	/**
 	 * Creates the {@link DBOptions}for this pre-defined setting.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 9d289b4..83b99ad 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -126,6 +126,10 @@ import java.util.concurrent.RunnableFuture;
  * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
  * checkpointing. This state backend can store very large state that exceeds memory and spills
  * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.
+ *
+ * <p>This class follows the rules for closing/releasing native RocksDB resources as
described in
+ + <a href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families">
+ * this document</a>.
  */
 public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
@@ -160,6 +164,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 	protected RocksDB db;
 
 	/**
+	 * We are not using the default column family for Flink state ops, but we still need to
remember this handle so that
+	 * we can close it properly when the backend is closed. This is required by RocksDB's native
memory management.
+	 */
+	private ColumnFamilyHandle defaultColumnFamily;
+
+	/**
 	 * Information about the k/v states as we create them. This is used to retrieve the
 	 * column family that is used for a state and also for sanity checks when restoring.
 	 */
@@ -254,30 +264,31 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			// and access it in a synchronized block that locks on #dbDisposeLock.
 			if (db != null) {
 
-				for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>
column :
-						kvStateInformation.values()) {
-					try {
-						column.f0.close();
-					} catch (Exception ex) {
-						LOG.info("Exception while closing ColumnFamilyHandle object.", ex);
-					}
+				// RocksDB's native memory management requires that *all* CFs (including default) are
closed before the
+				// DB is closed. So we start with the ones created by Flink...
+				for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>
columnMetaData :
+					kvStateInformation.values()) {
+
+					IOUtils.closeQuietly(columnMetaData.f0);
 				}
 
-				kvStateInformation.clear();
-				restoredKvStateMetaInfos.clear();
+				// ... close the default CF ...
+				IOUtils.closeQuietly(defaultColumnFamily);
 
-				try {
-					db.close();
-				} catch (Exception ex) {
-					LOG.info("Exception while closing RocksDB object.", ex);
-				}
+				// ... and finally close the DB instance ...
+				IOUtils.closeQuietly(db);
 
+				// invalidate the reference before releasing the lock so that other accesses will not
cause crashes
 				db = null;
+
 			}
 		}
 
-		IOUtils.closeQuietly(columnOptions);
+		kvStateInformation.clear();
+		restoredKvStateMetaInfos.clear();
+
 		IOUtils.closeQuietly(dbOptions);
+		IOUtils.closeQuietly(columnOptions);
 
 		try {
 			FileUtils.deleteDirectory(instanceBasePath);
@@ -1039,6 +1050,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {
 
 		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(stateColumnFamilyDescriptors);
+
+		// we add the required descriptor for the default CF in last position.
 		columnFamilyDescriptors.add(
 			new ColumnFamilyDescriptor(
 				"default".getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions));
@@ -1057,9 +1070,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			throw new IOException("Error while opening RocksDB instance.", e);
 		}
 
+		final int defaultColumnFamilyIndex = columnFamilyHandles.size() - 1;
+
+		// extract the default column family.
+		defaultColumnFamily = columnFamilyHandles.get(defaultColumnFamilyIndex);
+
 		if (stateColumnFamilyHandles != null) {
+			// return all CFs except the default CF which is kept separately because it is not used
in Flink operations.
 			stateColumnFamilyHandles.addAll(
-				columnFamilyHandles.subList(0, columnFamilyHandles.size() - 1));
+				columnFamilyHandles.subList(0, defaultColumnFamilyIndex));
 		}
 
 		return db;

http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index ff433ad..8ec29e2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -260,16 +260,7 @@ public class RocksDBStateBackendConfigTest {
 		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
 		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
 
-		try (
-				DBOptions optCreated = rocksDbBackend.getDbOptions();
-				DBOptions optReference = new DBOptions();
-				ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) {
-
-			// check that our instance uses something that we configured
-			assertEquals(true, optCreated.disableDataSync());
-			// just ensure that we pickend an option that actually differs from the reference.
-			assertEquals(false, optReference.disableDataSync());
-
+		try (ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) {
 			assertEquals(CompactionStyle.LEVEL, colCreated.compactionStyle());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d818fc48/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index 3231e96..b26fa48 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -31,7 +31,6 @@ import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksIterator;
-import org.rocksdb.StringAppendOperator;
 import org.rocksdb.WriteOptions;
 import sun.misc.Unsafe;
 
@@ -39,6 +38,8 @@ import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
+import static org.apache.flink.contrib.streaming.state.PredefinedOptions.MERGE_OPERATOR_NAME;
+
 /**
  * Test that validates that the performance of RocksDB is as expected.
  * This test guards against the bug filed as 'FLINK-5756'
@@ -74,9 +75,8 @@ public class RocksDBPerformanceTest extends TestLogger {
 					.setIncreaseParallelism(4)
 					.setUseFsync(false)
 					.setMaxOpenFiles(-1)
-					.setDisableDataSync(true)
 					.setCreateIfMissing(true)
-					.setMergeOperator(new StringAppendOperator());
+					.setMergeOperatorName(MERGE_OPERATOR_NAME);
 
 			final WriteOptions write_options = new WriteOptions()
 					.setSync(false)
@@ -152,9 +152,8 @@ public class RocksDBPerformanceTest extends TestLogger {
 					.setIncreaseParallelism(4)
 					.setUseFsync(false)
 					.setMaxOpenFiles(-1)
-					.setDisableDataSync(true)
 					.setCreateIfMissing(true)
-					.setMergeOperator(new StringAppendOperator());
+					.setMergeOperatorName(MERGE_OPERATOR_NAME);
 
 			final WriteOptions write_options = new WriteOptions()
 					.setSync(false)


Mime
View raw message