kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4233: StateDirectory fails to create directory if any parent directory does not exist
Date Fri, 30 Sep 2016 18:56:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 45f608997 -> cbf4bf4ff


KAFKA-4233: StateDirectory fails to create directory if any parent directory does not exist

Change the creation of the directories, in the StateDirectory constructor, to use mkdirs so
any parents get created. Throw an exception if the directory doesn't exist and couldn't be
created

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Michael G. Noll, Eno Thereska, Guozhang Wang

Closes #1942 from dguy/kafka-4233

(cherry picked from commit 5f3746d135697f364aaacf877ce288267d00b9a2)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cbf4bf4f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cbf4bf4f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cbf4bf4f

Branch: refs/heads/0.10.1
Commit: cbf4bf4ff0162a16039258cd8e2006492a7631f9
Parents: 45f6089
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Sep 30 11:55:32 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Sep 30 11:56:16 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/StateDirectory.java | 16 ++++++++++------
 .../processor/internals/StateDirectoryTest.java     | 10 ++++++++++
 2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cbf4bf4f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 02abdeb..3048fba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,12 +48,14 @@ public class StateDirectory {
 
     public StateDirectory(final String applicationId, final String stateDirConfig) {
         final File baseDir = new File(stateDirConfig);
-        if (!baseDir.exists()) {
-            baseDir.mkdir();
+        if (!baseDir.exists() && !baseDir.mkdirs()) {
+            throw new ProcessorStateException(String.format("state directory [%s] doesn't
exist and couldn't be created",
+                                                            stateDirConfig));
         }
         stateDir = new File(baseDir, applicationId);
-        if (!stateDir.exists()) {
-            stateDir.mkdir();
+        if (!stateDir.exists() && !stateDir.mkdir()) {
+            throw new ProcessorStateException(String.format("state directory [%s] doesn't
exist and couldn't be created",
+                                                            stateDir.getPath()));
         }
 
     }
@@ -64,8 +67,9 @@ public class StateDirectory {
      */
     public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
-        if (!taskDir.exists()) {
-            taskDir.mkdir();
+        if (!taskDir.exists() && !taskDir.mkdir()) {
+            throw new ProcessorStateException(String.format("task directory [%s] doesn't
exist and couldn't be created",
+                                                            taskDir.getPath()));
         }
         return taskDir;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbf4bf4f/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index c17e7bc..6fc855c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -166,4 +166,14 @@ public class StateDirectoryTest {
         assertTrue(dirs.contains(taskDir2));
     }
 
+    @Test
+    public void shouldCreateDirectoriesIfParentDoesntExist() throws Exception {
+        final File tempDir = TestUtils.tempDirectory();
+        final File stateDir = new File(new File(tempDir, "foo"), "state-dir");
+        final StateDirectory stateDirectory = new StateDirectory(applicationId, stateDir.getPath());
+        final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0));
+        assertTrue(stateDir.exists());
+        assertTrue(taskDir.exists());
+    }
+
 }
\ No newline at end of file


Mime
View raw message