flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject flink git commit: [FLINK-6505] Proactively cleanup local FS for RocksDBKeyedStateBackend on startup
Date Thu, 23 Nov 2017 12:52:51 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.4 35517f129 -> 8a052bf09


[FLINK-6505] Proactively cleanup local FS for RocksDBKeyedStateBackend on startup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8a052bf0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8a052bf0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8a052bf0

Branch: refs/heads/release-1.4
Commit: 8a052bf0948d92d6fccc4d1c6c4bd2aa459032c9
Parents: 35517f1
Author: Bowen Li <bowenli86@gmail.com>
Authored: Tue Oct 10 07:31:17 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Thu Nov 23 13:51:41 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 28 ++++++++++----------
 1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8a052bf0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f67daab..9185ad0 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -235,20 +235,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
 		this.instanceRocksDBPath = new File(instanceBasePath, "db");
 
-		if (!instanceBasePath.exists()) {
-			if (!instanceBasePath.mkdirs()) {
-				throw new IOException("Could not create RocksDB data directory.");
-			}
+		if (instanceBasePath.exists()) {
+			// Clear the base directory when the backend is created
+			// in case something crashed and the backend never reached dispose()
+			cleanInstanceBasePath();
 		}
 
-		// clean it, this will remove the last part of the path but RocksDB will recreate it
-		try {
-			if (instanceRocksDBPath.exists()) {
-				LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath);
-				FileUtils.deleteDirectory(instanceRocksDBPath);
-			}
-		} catch (IOException e) {
-			throw new IOException("Error cleaning RocksDB data directory.", e);
+		if (!instanceBasePath.mkdirs()) {
+			throw new IOException("Could not create RocksDB data directory.");
 		}
 
 		this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
@@ -312,10 +306,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		IOUtils.closeQuietly(dbOptions);
 		IOUtils.closeQuietly(columnOptions);
 
+		cleanInstanceBasePath();
+	}
+
+	private void cleanInstanceBasePath() {
+		LOG.info("Deleting existing instance base directory {}.", instanceBasePath);
+
 		try {
 			FileUtils.deleteDirectory(instanceBasePath);
-		} catch (IOException ioex) {
-			LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath, ioex);
+		} catch (IOException ex) {
+			LOG.warn("Could not delete instance base path for RocksDB: " + instanceBasePath, ex);
 		}
 	}
 


Mime
View raw message