incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Refectoring some code for BLUR-400, now resolved.
Date Fri, 26 Dec 2014 15:38:11 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 69e37c3b9 -> 89cfdb8d4


Refectoring some code for BLUR-400, now resolved.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/89cfdb8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/89cfdb8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/89cfdb8d

Branch: refs/heads/master
Commit: 89cfdb8d4f7d9cf9833334495011c9d6fa765dd1
Parents: 69e37c3
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Dec 26 10:38:00 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Dec 26 10:38:00 2014 -0500

----------------------------------------------------------------------
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      |   6 +-
 .../hdfs_v2/FastHdfsKeyValueIndexOutput.java    |   2 +-
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 138 ++++++++++---------
 .../hdfs_v2/FastHdfsKeyValueDirectoryTest.java  |  34 ++---
 4 files changed, 93 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89cfdb8d/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
index 5f7442a..a690e18 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -65,7 +65,7 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
     BytesRef value = new BytesRef();
     if (_store.get(FILES, value)) {
       String filesString = value.utf8ToString();
-//      System.out.println("Open Files String [" + filesString + "]");
+      // System.out.println("Open Files String [" + filesString + "]");
       String[] files = filesString.split("\\" + SEP);
       for (String file : files) {
         if (file.isEmpty()) {
@@ -136,7 +136,6 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
       }
       builder.append(n);
     }
-//    System.out.println("Writing Files String [" + builder.toString() + "]");
     _store.put(FILES, new BytesRef(builder.toString()));
   }
 
@@ -147,6 +146,9 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
 
   @Override
   public IndexOutput createOutput(final String name, IOContext context) throws IOException
{
+    if (fileExists(name)) {
+      deleteFile(name);
+    }
     return new FastHdfsKeyValueIndexOutput(name, _blockSize, this);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89cfdb8d/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexOutput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexOutput.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexOutput.java
index 56b3cfe..b341f42 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexOutput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexOutput.java
@@ -44,7 +44,7 @@ public class FastHdfsKeyValueIndexOutput extends IndexOutput {
 
   @Override
   public void close() throws IOException {
-    _closed=true;
+    _closed = true;
     long blockId;
     if (_bufferPosition == _blockSize) {
       blockId = (_position - 1) / _blockSize;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89cfdb8d/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index afbe6d3..b9da75e 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -67,6 +67,7 @@ import com.yammer.metrics.core.MetricName;
 
 public class HdfsKeyValueStore implements Store {
 
+  private static final String UTF_8 = "UTF-8";
   private static final String BLUR_KEY_VALUE = "blur_key_value";
   private static final String IN = "in";
   private static final String GET_FILE_LENGTH = "getFileLength";
@@ -80,7 +81,7 @@ public class HdfsKeyValueStore implements Store {
 
   static {
     try {
-      MAGIC = BLUR_KEY_VALUE.getBytes("UTF-8");
+      MAGIC = BLUR_KEY_VALUE.getBytes(UTF_8);
     } catch (UnsupportedEncodingException e) {
       throw new RuntimeException(e);
     }
@@ -156,13 +157,14 @@ public class HdfsKeyValueStore implements Store {
   private final WriteLock _writeLock;
   private final ReadLock _readLock;
   private final AtomicLong _size = new AtomicLong();
-
+  private final long _maxAmountAllowedPerFile;
+  private final TimerTask _idleLogTimerTask;
+  private final TimerTask _oldFileCleanerTimerTask;
   private final AtomicLong _lastWrite = new AtomicLong();
+
   private FSDataOutputStream _output;
   private Path _outputPath;
-  private final long _maxAmountAllowedPerFile;
   private boolean _isClosed;
-  private final TimerTask _timerTask;
 
   public HdfsKeyValueStore(Timer hdfsKeyValueTimer, Configuration configuration, Path path)
throws IOException {
     this(hdfsKeyValueTimer, configuration, path, DEFAULT_MAX);
@@ -183,7 +185,11 @@ public class HdfsKeyValueStore implements Store {
     }
     removeAnyTruncatedFiles();
     loadIndexes();
-    _timerTask = addToTimer(hdfsKeyValueTimer);
+    cleanupOldFiles();
+    _idleLogTimerTask = getIdleLogTimer();
+    _oldFileCleanerTimerTask = getOldFileCleanerTimer();
+    hdfsKeyValueTimer.schedule(_idleLogTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
+    hdfsKeyValueTimer.schedule(_oldFileCleanerTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
     Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, HDFS_KV, SIZE, path.getParent().toString()),
new Gauge<Long>() {
       @Override
       public Long value() {
@@ -207,45 +213,31 @@ public class HdfsKeyValueStore implements Store {
     }
   }
 
-  private TimerTask addToTimer(Timer hdfsKeyValueTimer) {
-    _writeLock.lock();
-    try {
-      try {
-        cleanupOldFiles();
-      } catch (IOException e) {
-        LOG.error("Unknown error while trying to clean up old files.", e);
+  private TimerTask getOldFileCleanerTimer() {
+    return new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          cleanupOldFiles();
+        } catch (IOException e) {
+          LOG.error("Unknown error while trying to clean up old files.", e);
+        }
       }
-    } finally {
-      _writeLock.unlock();
-    }
-    TimerTask timerTask = new TimerTask() {
+    };
+  }
+
+  private TimerTask getIdleLogTimer() {
+    return new TimerTask() {
       @Override
       public void run() {
-        _writeLock.lock();
         try {
-          if (_output != null && _lastWrite.get() + MAX_OPEN_FOR_WRITING < System.currentTimeMillis())
{
-            try {
-              cleanupOldFiles();
-            } catch (IOException e) {
-              LOG.error("Unknown error while trying to clean up old files.", e);
-            }
-            // Close writer
-            LOG.info("Closing KV log due to inactivity [{0}].", _path);
-            try {
-              _output.close();
-            } catch (IOException e) {
-              LOG.error("Unknown error while trying to close output file.", e);
-            } finally {
-              _output = null;
-            }
-          }
-        } finally {
-          _writeLock.unlock();
+          closeLogFileIfIdle();
+        } catch (IOException e) {
+          LOG.error("Unknown error while trying to close output file.", e);
         }
       }
+
     };
-    hdfsKeyValueTimer.schedule(timerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
-    return timerTask;
   }
 
   @Override
@@ -364,30 +356,52 @@ public class HdfsKeyValueStore implements Store {
   }
 
   public void cleanupOldFiles() throws IOException {
-    if (!isOpenForWriting()) {
-      return;
-    }
-    SortedSet<FileStatus> fileStatusSet = getSortedSet(_path);
-    if (fileStatusSet == null || fileStatusSet.size() < 1) {
-      return;
-    }
-    Path newestGen = fileStatusSet.last().getPath();
-    if (!newestGen.equals(_outputPath)) {
-      throw new IOException("No longer the owner of [" + _path + "]");
-    }
-    Set<Path> existingFiles = new HashSet<Path>();
-    for (FileStatus fileStatus : fileStatusSet) {
-      existingFiles.add(fileStatus.getPath());
-    }
-    Set<Entry<BytesRef, Value>> entrySet = _pointers.entrySet();
-    existingFiles.remove(_outputPath);
-    for (Entry<BytesRef, Value> e : entrySet) {
-      Path p = e.getValue()._path;
-      existingFiles.remove(p);
+    _writeLock.lock();
+    try {
+      if (!isOpenForWriting()) {
+        return;
+      }
+      SortedSet<FileStatus> fileStatusSet = getSortedSet(_path);
+      if (fileStatusSet == null || fileStatusSet.size() < 1) {
+        return;
+      }
+      Path newestGen = fileStatusSet.last().getPath();
+      if (!newestGen.equals(_outputPath)) {
+        throw new IOException("No longer the owner of [" + _path + "]");
+      }
+      Set<Path> existingFiles = new HashSet<Path>();
+      for (FileStatus fileStatus : fileStatusSet) {
+        existingFiles.add(fileStatus.getPath());
+      }
+      Set<Entry<BytesRef, Value>> entrySet = _pointers.entrySet();
+      existingFiles.remove(_outputPath);
+      for (Entry<BytesRef, Value> e : entrySet) {
+        Path p = e.getValue()._path;
+        existingFiles.remove(p);
+      }
+      for (Path p : existingFiles) {
+        LOG.info("Removing file no longer referenced [{0}]", p);
+        _fileSystem.delete(p, false);
+      }
+    } finally {
+      _writeLock.unlock();
     }
-    for (Path p : existingFiles) {
-      LOG.info("Removing file no longer referenced [{0}]", p);
-      _fileSystem.delete(p, false);
+  }
+
+  private void closeLogFileIfIdle() throws IOException {
+    _writeLock.lock();
+    try {
+      if (_output != null && _lastWrite.get() + MAX_OPEN_FOR_WRITING < System.currentTimeMillis())
{
+        // Close writer
+        LOG.info("Closing KV log due to inactivity [{0}].", _path);
+        try {
+          _output.close();
+        } finally {
+          _output = null;
+        }
+      }
+    } finally {
+      _writeLock.unlock();
     }
   }
 
@@ -451,7 +465,8 @@ public class HdfsKeyValueStore implements Store {
   public void close() throws IOException {
     if (!_isClosed) {
       _isClosed = true;
-      _timerTask.cancel();
+      _idleLogTimerTask.cancel();
+      _oldFileCleanerTimerTask.cancel();
       _writeLock.lock();
       try {
         if (isOpenForWriting()) {
@@ -521,7 +536,6 @@ public class HdfsKeyValueStore implements Store {
     int version = inputStream.readInt();
     if (version == 1) {
       long fileLength = getFileLength(path, inputStream);
-//      System.out.println("Load Index File [" + path + "] Length [" + fileLength + "]");
       Operation operation = new Operation();
       try {
         while (inputStream.getPos() < fileLength) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89cfdb8d/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
index a1c403b..953ae04 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
@@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -112,7 +111,7 @@ public class FastHdfsKeyValueDirectoryTest {
     System.out.println("Seed:" + seed);
     Random random = new Random(seed);
     int docCount = 0;
-    int passes = 100;
+    int passes = 50;
     for (int run = 0; run < passes; run++) {
       final FastHdfsKeyValueDirectory directory = new FastHdfsKeyValueDirectory(_timer, _configuration,
new Path(_path,
           "test_multiple_commits_reopens"));
@@ -123,16 +122,18 @@ public class FastHdfsKeyValueDirectoryTest {
       for (int i = 0; i < numberOfCommits; i++) {
         assertFiles(fileSet, run, i, directory);
         addDocuments(writer, random.nextInt(100));
-        // System.out.println("Before Commit");
+        // Before Commit
         writer.commit();
-        // System.out.println("After Commit");
-
-        fileSet.clear();
-        List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
-        assertEquals(1, listCommits.size());
-        IndexCommit indexCommit = listCommits.get(0);
-        fileSet.addAll(indexCommit.getFileNames());
-        // System.out.println("Files after commit " + fileSet);
+        // After Commit
+
+        // Set files after commit
+        {
+          fileSet.clear();
+          List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
+          assertEquals(1, listCommits.size());
+          IndexCommit indexCommit = listCommits.get(0);
+          fileSet.addAll(indexCommit.getFileNames());
+        }
       }
       docCount = getDocumentCount(directory);
     }
@@ -164,20 +165,9 @@ public class FastHdfsKeyValueDirectoryTest {
     missing.removeAll(actual);
     Set<String> extra = new TreeSet<String>(actual);
     extra.removeAll(expected);
-    // System.out.println("Segment Files [" + getSegmentFiles(actual) + "]");
     assertEquals("Pass [" + run + "] Missing Files " + " Extra Files " + extra + "", expected,
actual);
   }
 
-  private Set<String> getSegmentFiles(Set<String> actual) {
-    Set<String> result = new HashSet<String>();
-    for (String s : actual) {
-      if (s.startsWith("segments_")) {
-        result.add(s);
-      }
-    }
-    return result;
-  }
-
   private void addDocuments(IndexWriter writer, int numberOfDocs) throws IOException {
     for (int i = 0; i < numberOfDocs; i++) {
       addDoc(writer, getDoc(i));


Mime
View raw message