incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixing issue where a writing kvstore will continue syncing even though another process has taken ownership.
Date Fri, 12 Jun 2015 23:06:59 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master ac41db559 -> 4a5f070b0


Fixing issue where a writing kvstore will continue syncing even though another process has
taken ownership.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4a5f070b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4a5f070b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4a5f070b

Branch: refs/heads/master
Commit: 4a5f070b099206f020040333028d43599afc7764
Parents: ac41db5
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Jun 12 19:07:09 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Jun 12 19:07:09 2015 -0400

----------------------------------------------------------------------
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 30 ++++++++++++++------
 .../store/hdfs_v2/HdfsKeyValueStoreTest.java    | 18 +++++++++---
 2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4a5f070b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index f763ab1..09be70c 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -41,6 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -485,9 +486,12 @@ public class HdfsKeyValueStore implements Store {
       _writeLock.lock();
       try {
         if (isOpenForWriting()) {
-          syncInternal();
-          _output.close();
-          _output = null;
+          try {
+            syncInternal();
+          } finally {
+            IOUtils.closeQuietly(_output);
+            _output = null;
+          }
         }
       } finally {
         _writeLock.unlock();
@@ -499,9 +503,7 @@ public class HdfsKeyValueStore implements Store {
     if (_readOnly) {
       throw new IOException("Key value store is set in read only mode.");
     }
-    long nextSegment = _currentFileCounter.incrementAndGet();
-    String name = buffer(nextSegment);
-    _outputPath = new Path(_path, name);
+    _outputPath = getSegmentPath(_currentFileCounter.incrementAndGet());
     LOG.info("Opening for writing [{0}].", _outputPath);
     _output = _fileSystem.create(_outputPath, false);
     _output.write(MAGIC);
@@ -509,7 +511,11 @@ public class HdfsKeyValueStore implements Store {
     syncInternal();
   }
 
-  private String buffer(long number) {
+  private Path getSegmentPath(long segment) {
+    return new Path(_path, buffer(segment));
+  }
+
+  private static String buffer(long number) {
     String s = Long.toString(number);
     StringBuilder builder = new StringBuilder();
     for (int i = s.length(); i < 12; i++) {
@@ -531,11 +537,17 @@ public class HdfsKeyValueStore implements Store {
   }
 
   private void syncInternal() throws IOException {
+    validateNextSegmentHasNotStarted();
     _output.flush();
     _output.sync();
     _lastWrite.set(System.currentTimeMillis());
-    // System.out.println("Sync Output Path [" + _outputPath + "] Position [" +
-    // _output.getPos() + "]");
+  }
+
+  private void validateNextSegmentHasNotStarted() throws IOException {
+    Path p = getSegmentPath(_currentFileCounter.get() + 1);
+    if (_fileSystem.exists(p)) {
+      throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.");
+    }
   }
 
   private void loadIndex(Path path) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4a5f070b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
index 1b24aa5..e2a6255 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.blur.store.hdfs_v2;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -144,7 +147,7 @@ public class HdfsKeyValueStoreTest {
     store.close();
   }
 
-  // @Test
+  @Test
   public void testTwoKeyStoreInstancesWritingAtTheSameTime() throws IOException {
     HdfsKeyValueStore store1 = new HdfsKeyValueStore(false, _timer, _configuration, _path);
     listFiles();
@@ -169,7 +172,14 @@ public class HdfsKeyValueStoreTest {
 
     }
     store2.sync();
-    store1.close();
+
+    try {
+      store1.close();
+      fail();
+    } catch (Exception e) {
+
+    }
+
     store2.close();
 
     HdfsKeyValueStore store3 = new HdfsKeyValueStore(false, _timer, _configuration, _path);
@@ -237,7 +247,7 @@ public class HdfsKeyValueStoreTest {
 
     // Store 1 should still be able to write.
     store1.put(new BytesRef("a2"), new BytesRef(new byte[2000]));
-    
+
     // Store 2 should not be able to find.
     assertFalse(store2.get(new BytesRef("a2"), new BytesRef(new byte[2000])));
 


Mime
View raw message