incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/8] git commit: Fixing issue when new file for hdfs key value is created but header is not synced before process dies or shard is moved.
Date Sun, 29 Jun 2014 18:25:23 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/apache-blur-0.2 e0da5865e -> ff82c14b3


Fixing issue when new file for hdfs key value is created but header is not synced before process
dies or shard is moved.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4e5199d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4e5199d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4e5199d1

Branch: refs/heads/apache-blur-0.2
Commit: 4e5199d1697f65cc965c737219a8dbcb35fc9927
Parents: a75506c
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Jun 29 13:58:21 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Jun 29 13:58:21 2014 -0400

----------------------------------------------------------------------
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 41 ++++++++++++++------
 .../store/hdfs_v2/HdfsKeyValueStoreTest.java    |  3 ++
 2 files changed, 32 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e5199d1/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index 71fd10e..526c12e 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -75,6 +75,7 @@ public class HdfsKeyValueStore implements Store {
   private static final int VERSION = 1;
   private static final long MAX_OPEN_FOR_WRITING = TimeUnit.MINUTES.toMillis(1);
   private static final long DAEMON_POLL_TIME = TimeUnit.SECONDS.toMillis(5);
+  private static final int VERSION_LENGTH = 4;
 
   static {
     try {
@@ -133,7 +134,7 @@ public class HdfsKeyValueStore implements Store {
     public int compare(BytesRef b1, BytesRef b2) {
       return WritableComparator.compareBytes(b1.bytes, b1.offset, b1.length, b2.bytes, b2.offset,
b2.length);
     }
-  };
+  };;
 
   static class Value {
     Value(BytesRef bytesRef, Path path) {
@@ -174,6 +175,7 @@ public class HdfsKeyValueStore implements Store {
     _path = path;
     _configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
     _fileSystem = FileSystem.get(_path.toUri(), _configuration);
+    _fileSystem.mkdirs(_path);
     _readWriteLock = new ReentrantReadWriteLock();
     _writeLock = _readWriteLock.writeLock();
     _readLock = _readWriteLock.readLock();
@@ -181,10 +183,9 @@ public class HdfsKeyValueStore implements Store {
     if (!_fileStatus.get().isEmpty()) {
       _currentFileCounter.set(Long.parseLong(_fileStatus.get().last().getPath().getName()));
     }
+    removeAnyTruncatedFiles();
     loadIndexes();
-    openWriter();
     _daemon = startDaemon();
-    cleanupOldFiles();
     Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, HDFS_KV, SIZE, path.getParent().toString()),
new Gauge<Long>() {
       @Override
       public Long value() {
@@ -193,10 +194,35 @@ public class HdfsKeyValueStore implements Store {
     });
   }
 
+  private void removeAnyTruncatedFiles() throws IOException {
+    for (FileStatus fileStatus : _fileStatus.get()) {
+      Path path = fileStatus.getPath();
+      FSDataInputStream inputStream = _fileSystem.open(path);
+      long len = getFileLength(path, inputStream);
+      inputStream.close();
+      if (len < MAGIC.length + VERSION_LENGTH) {
+        // Remove invalid file
+        LOG.warn("Removing file [{0}] because length of [{1}] is less than MAGIC plus version
length of [{2}]", path,
+            len, MAGIC.length + VERSION_LENGTH);
+        _fileSystem.delete(path, false);
+      }
+    }
+  }
+
   private Thread startDaemon() {
     Thread thread = new Thread(new Runnable() {
       @Override
       public void run() {
+        _writeLock.lock();
+        try {
+          try {
+            cleanupOldFiles();
+          } catch (IOException e) {
+            LOG.error("Unknown error while trying to clean up old files.", e);
+          }
+        } finally {
+          _writeLock.unlock();
+        }
         while (_running.get()) {
           _writeLock.lock();
           try {
@@ -317,9 +343,6 @@ public class HdfsKeyValueStore implements Store {
       if (old != null) {
         _size.addAndGet(-old._bytesRef.bytes.length);
       }
-      if (!path.equals(_outputPath)) {
-        cleanupOldFiles();
-      }
     } catch (RemoteException e) {
       throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
     } catch (LeaseExpiredException e) {
@@ -345,16 +368,11 @@ public class HdfsKeyValueStore implements Store {
   }
 
   private void rollFile() throws IOException {
-    logSize();
     LOG.info("Rolling file [" + _outputPath + "]");
     _output.close();
     openWriter();
   }
 
-  private void logSize() {
-
-  }
-
   public void cleanupOldFiles() throws IOException {
     SortedSet<FileStatus> fileStatusSet = getSortedSet(_path);
     if (fileStatusSet == null || fileStatusSet.size() < 1) {
@@ -452,7 +470,6 @@ public class HdfsKeyValueStore implements Store {
   }
 
   private void openWriter() throws IOException {
-    logSize();
     long nextSegment = _currentFileCounter.incrementAndGet();
     String name = buffer(nextSegment);
     _outputPath = new Path(_path, name);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e5199d1/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
index 363f592..2031017 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
@@ -120,6 +120,7 @@ public class HdfsKeyValueStoreTest {
   public void testFileRolling() throws IOException {
     HdfsKeyValueStore store = new HdfsKeyValueStore(_configuration, _path, 1000);
     FileSystem fileSystem = _path.getFileSystem(_configuration);
+    store.put(new BytesRef("a"), new BytesRef(""));
     assertEquals(1, fileSystem.listStatus(_path).length);
     store.put(new BytesRef("a"), new BytesRef(new byte[2000]));
     assertEquals(2, fileSystem.listStatus(_path).length);
@@ -129,11 +130,13 @@ public class HdfsKeyValueStoreTest {
   @Test
   public void testFileGC() throws IOException {
     HdfsKeyValueStore store = new HdfsKeyValueStore(_configuration, _path, 1000);
+    store.put(new BytesRef("a"), new BytesRef(""));
     FileSystem fileSystem = _path.getFileSystem(_configuration);
     assertEquals(1, fileSystem.listStatus(_path).length);
     store.put(new BytesRef("a"), new BytesRef(new byte[2000]));
     assertEquals(2, fileSystem.listStatus(_path).length);
     store.put(new BytesRef("a"), new BytesRef(new byte[2000]));
+    store.cleanupOldFiles();
     assertEquals(2, fileSystem.listStatus(_path).length);
     store.close();
   }


Mime
View raw message