flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3730] Fix RocksDB Local Directory Initialization
Date Wed, 13 Apr 2016 08:38:34 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 4f9c19808 -> ff38202ba


[FLINK-3730] Fix RocksDB Local Directory Initialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff38202b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff38202b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff38202b

Branch: refs/heads/release-1.0
Commit: ff38202bacfbe07e91bc1b46d44e5eb46e991e79
Parents: 4f9c198
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Apr 12 10:46:59 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Apr 13 10:38:11 2016 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBStateBackend.java      |  9 ++++++---
 .../streaming/state/RocksDBStateBackendConfigTest.java    | 10 +++++++++-
 2 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff38202b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index e3b4f4d..8f846da 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.FoldingState;
@@ -181,13 +182,15 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			
 			for (Path path : configuredDbBasePaths) {
 				File f = new File(path.toUri().getPath());
-				if (!f.exists() && !f.mkdirs()) {
-					String msg = "Local DB files directory '" + f.getAbsolutePath()
+				File testDir = new File(f, UUID.randomUUID().toString());
+				if (!testDir.mkdirs()) {
+					String msg = "Local DB files directory '" + path
 							+ "' does not exist and cannot be created. ";
 					LOG.error(msg);
 					errorMessage += msg;
+				} else {
+					dirs.add(f);
 				}
-				dirs.add(f);
 			}
 			
 			if (dirs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ff38202b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 8e0993b..42ba275 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -20,8 +20,10 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -133,14 +135,17 @@ public class RocksDBStateBackendConfigTest {
 			
 			RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
 			rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
-			
+
+			boolean hasFailure = false;
 			try {
 				rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE);
 			}
 			catch (Exception e) {
 				assertTrue(e.getMessage().contains("No local storage directories available"));
 				assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
+				hasFailure = true;
 			}
+			assertTrue("We must see a failure because no storaged directory is feasible.", hasFailure);
 		}
 		finally {
 			//noinspection ResultOfMethodCallIgnored
@@ -168,6 +173,9 @@ public class RocksDBStateBackendConfigTest {
 	
 			try {
 				rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE);
+
+				// actually get a state to see whether we can write to the storage directory
+				rocksDbBackend.getPartitionedState(null, VoidSerializer.INSTANCE, new ValueStateDescriptor<>("test",
String.class, ""));
 			}
 			catch (Exception e) {
 				e.printStackTrace();


Mime
View raw message