incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Fixing issue with an empty fast kv directory.
Date Thu, 19 Mar 2015 12:25:55 GMT
Fixing issue with an empty fast kv directory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/74c6928d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/74c6928d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/74c6928d

Branch: refs/heads/master
Commit: 74c6928d27d460be468414416699707285d334a6
Parents: 20d1ce5
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Mar 19 08:25:50 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Mar 19 08:25:50 2015 -0400

----------------------------------------------------------------------
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      | 32 ++++++++++++--------
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 24 +++++++--------
 .../hdfs_v2/FastHdfsKeyValueDirectoryTest.java  | 11 +++++++
 3 files changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/74c6928d/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
index 8610c8d..02c7d5c 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -63,28 +63,36 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
   private long _lastGc;
 
   public FastHdfsKeyValueDirectory(Timer hdfsKeyValueTimer, Configuration configuration,
Path path) throws IOException {
+    this(hdfsKeyValueTimer, configuration, path, HdfsKeyValueStore.DEFAULT_MAX_AMOUNT_ALLOWED_PER_FILE,
+        HdfsKeyValueStore.DEFAULT_MAX_OPEN_FOR_WRITING);
+  }
+
+  public FastHdfsKeyValueDirectory(Timer hdfsKeyValueTimer, Configuration configuration,
Path path,
+      long maxAmountAllowedPerFile, long maxOpenForWriting) throws IOException {
     _path = path;
-    _store = new HdfsKeyValueStore(hdfsKeyValueTimer, configuration, path);
+    _store = new HdfsKeyValueStore(hdfsKeyValueTimer, configuration, path, maxAmountAllowedPerFile,
maxOpenForWriting);
     MemoryLeakDetector.record(_store, "HdfsKeyValueStore", path.toString());
     BytesRef value = new BytesRef();
     if (_store.get(FILES, value)) {
       String filesString = value.utf8ToString();
       // System.out.println("Open Files String [" + filesString + "]");
-      String[] files = filesString.split("\\" + SEP);
-      for (String file : files) {
-        if (file.isEmpty()) {
-          throw new IOException("Empty file names should not occur [" + filesString + "]");
-        }
-        BytesRef key = new BytesRef(file + LENGTH);
-        if (_store.get(key, value)) {
-          _files.put(file, Long.parseLong(value.utf8ToString()));
-        } else {
-          LOG.warn(MISSING_METADATA_MESSAGE, file);
+      if (!filesString.isEmpty()) {
+        String[] files = filesString.split("\\" + SEP);
+        for (String file : files) {
+          if (file.isEmpty()) {
+            throw new IOException("Empty file names should not occur [" + filesString + "]");
+          }
+          BytesRef key = new BytesRef(file + LENGTH);
+          if (_store.get(key, value)) {
+            _files.put(file, Long.parseLong(value.utf8ToString()));
+          } else {
+            LOG.warn(MISSING_METADATA_MESSAGE, file);
+          }
         }
       }
     }
     setLockFactory(NoLockFactory.getNoLockFactory());
-    writeFilesNames();
+    writeFileNamesAndSync();
     gc();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/74c6928d/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index abbaae5..40600ac 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -16,10 +16,6 @@
  */
 package org.apache.blur.store.hdfs_v2;
 
-import static org.apache.blur.metrics.MetricsConstants.HDFS_KV;
-import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
-import static org.apache.blur.metrics.MetricsConstants.SIZE;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.FilterInputStream;
@@ -61,21 +57,18 @@ import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.lucene.util.BytesRef;
 
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.MetricName;
-
 public class HdfsKeyValueStore implements Store {
 
+  public static final int DEFAULT_MAX_AMOUNT_ALLOWED_PER_FILE = 64 * 1024 * 1024;
+  public static final long DEFAULT_MAX_OPEN_FOR_WRITING = TimeUnit.MINUTES.toMillis(1);
+  
   private static final String UTF_8 = "UTF-8";
   private static final String BLUR_KEY_VALUE = "blur_key_value";
   private static final String IN = "in";
   private static final String GET_FILE_LENGTH = "getFileLength";
-  private static final int DEFAULT_MAX = 64 * 1024 * 1024;
   private static final Log LOG = LogFactory.getLog(HdfsKeyValueStore.class);
   private static final byte[] MAGIC;
   private static final int VERSION = 1;
-  private static final long MAX_OPEN_FOR_WRITING = TimeUnit.MINUTES.toMillis(1);
   private static final long DAEMON_POLL_TIME = TimeUnit.SECONDS.toMillis(5);
   private static final int VERSION_LENGTH = 4;
 
@@ -162,17 +155,24 @@ public class HdfsKeyValueStore implements Store {
   private final TimerTask _oldFileCleanerTimerTask;
   private final AtomicLong _lastWrite = new AtomicLong();
   private final Timer _hdfsKeyValueTimer;
+  private final long _maxOpenForWriting;
 
   private FSDataOutputStream _output;
   private Path _outputPath;
   private boolean _isClosed;
 
   public HdfsKeyValueStore(Timer hdfsKeyValueTimer, Configuration configuration, Path path)
throws IOException {
-    this(hdfsKeyValueTimer, configuration, path, DEFAULT_MAX);
+    this(hdfsKeyValueTimer, configuration, path, DEFAULT_MAX_AMOUNT_ALLOWED_PER_FILE, DEFAULT_MAX_OPEN_FOR_WRITING);
   }
 
   public HdfsKeyValueStore(Timer hdfsKeyValueTimer, Configuration configuration, Path path,
long maxAmountAllowedPerFile)
       throws IOException {
+    this(hdfsKeyValueTimer, configuration, path, maxAmountAllowedPerFile, DEFAULT_MAX_OPEN_FOR_WRITING);
+  }
+
+  public HdfsKeyValueStore(Timer hdfsKeyValueTimer, Configuration configuration, Path path,
+      long maxAmountAllowedPerFile, long maxOpenForWriting) throws IOException {
+    _maxOpenForWriting = maxOpenForWriting;
     _maxAmountAllowedPerFile = maxAmountAllowedPerFile;
     _path = path;
     _fileSystem = _path.getFileSystem(configuration);
@@ -394,7 +394,7 @@ public class HdfsKeyValueStore implements Store {
   private void closeLogFileIfIdle() throws IOException {
     _writeLock.lock();
     try {
-      if (_output != null && _lastWrite.get() + MAX_OPEN_FOR_WRITING < System.currentTimeMillis())
{
+      if (_output != null && _lastWrite.get() + _maxOpenForWriting < System.currentTimeMillis())
{
         // Close writer
         LOG.info("Closing KV log due to inactivity [{0}].", _path);
         try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/74c6928d/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
index 1a89992..99d1eee 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
@@ -148,6 +148,17 @@ public class FastHdfsKeyValueDirectoryTest {
     }
   }
 
+  @SuppressWarnings("resource")
+  @Test
+  public void testOpenDirectoryAndReopenEmptyDirectory() throws IOException, InterruptedException
{
+    FastHdfsKeyValueDirectory directory1 = new FastHdfsKeyValueDirectory(_timer, _configuration,
new Path(_path,
+        "testOpenDirectoryAndReopenEmptyDirectory"), HdfsKeyValueStore.DEFAULT_MAX_AMOUNT_ALLOWED_PER_FILE,
5000L);
+    assertTrue(Arrays.equals(new String[] {}, directory1.listAll()));
+    FastHdfsKeyValueDirectory directory2 = new FastHdfsKeyValueDirectory(_timer, _configuration,
new Path(_path,
+        "testOpenDirectoryAndReopenEmptyDirectory"));
+    assertTrue(Arrays.equals(new String[] {}, directory2.listAll()));
+  }
+
   private byte[] readSegmentsGen(FastHdfsKeyValueDirectory directory) throws IOException
{
     boolean fileExists = directory.fileExists("segments.gen");
     if (!fileExists) {


Mime
View raw message