incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding some more checks for the fast directory and hdfs key value store.
Date Wed, 26 Feb 2014 01:46:47 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/apache-blur-0.2 e71d44ab7 -> e8ca8ecb7


Adding some more checks for the fast directory and hdfs key value store.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/e8ca8ecb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/e8ca8ecb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/e8ca8ecb

Branch: refs/heads/apache-blur-0.2
Commit: e8ca8ecb7fb6cf47db442befa2a32716446124bb
Parents: e71d44a
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Feb 25 20:46:13 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Feb 25 20:46:13 2014 -0500

----------------------------------------------------------------------
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      |  1 +
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 64 +++++++++----
 .../hdfs_v2/FastHdfsKeyValueDirectoryTest.java  | 96 ++++++++++++++++++++
 .../store/hdfs_v2/HdfsKeyValueStoreTest.java    | 19 ++--
 4 files changed, 153 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8ca8ecb/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
index ee1f15c..c87e4ef 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -44,6 +44,7 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
   private static final String LENGTH = "/length";
   private static final BytesRef FILES = new BytesRef("FILES");
   private static final String SEP = "|";
+  
   private final Map<String, Long> _files = new ConcurrentHashMap<String, Long>();
   private final HdfsKeyValueStore _store;
   private final int _blockSize = 4096;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8ca8ecb/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index 1841140..7d6d6ea 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -20,6 +20,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.FilterInputStream;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Arrays;
@@ -55,10 +56,21 @@ import org.apache.lucene.util.BytesRef;
 
 public class HdfsKeyValueStore implements Store {
 
+  private static final String BLUR_KEY_VALUE = "blur_key_value";
   private static final String IN = "in";
   private static final String GET_FILE_LENGTH = "getFileLength";
   private static final int DEFAULT_MAX = 64 * 1024 * 1024;
-  private final Log LOG = LogFactory.getLog(HdfsKeyValueStore.class);
+  private static final Log LOG = LogFactory.getLog(HdfsKeyValueStore.class);
+  private static final byte[] MAGIC;
+  private static final int VERSION = 1;
+
+  static {
+    try {
+      MAGIC = BLUR_KEY_VALUE.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
 
   static enum OperationType {
     PUT, DELETE
@@ -148,7 +160,7 @@ public class HdfsKeyValueStore implements Store {
     _readWriteLock = new ReentrantReadWriteLock();
     _writeLock = _readWriteLock.writeLock();
     _readLock = _readWriteLock.readLock();
-    _fileStatus.set(getList(_path));
+    _fileStatus.set(getSortedSet(_path));
     if (!_fileStatus.get().isEmpty()) {
       _currentFileCounter.set(Long.parseLong(_fileStatus.get().last().getPath().getName()));
     }
@@ -261,9 +273,13 @@ public class HdfsKeyValueStore implements Store {
   }
 
   public void cleanupOldFiles() throws IOException {
-    FileStatus[] listStatus = _fileSystem.listStatus(_path);
+    SortedSet<FileStatus> fileStatusSet = getSortedSet(_path);
+    Path newestGen = fileStatusSet.last().getPath();
+    if (!newestGen.equals(_outputPath)) {
+      throw new IOException("No longer the owner of [" + _path + "]");
+    }
     Set<Path> existingFiles = new HashSet<Path>();
-    for (FileStatus fileStatus : listStatus) {
+    for (FileStatus fileStatus : fileStatusSet) {
       existingFiles.add(fileStatus.getPath());
     }
     Set<Entry<BytesRef, Value>> entrySet = _pointers.entrySet();
@@ -272,7 +288,6 @@ public class HdfsKeyValueStore implements Store {
       Path p = e.getValue()._path;
       existingFiles.remove(p);
     }
-
     for (Path p : existingFiles) {
       LOG.info("Removing file no longer referenced [{0}]", p);
       _fileSystem.delete(p, false);
@@ -347,6 +362,9 @@ public class HdfsKeyValueStore implements Store {
     String name = buffer(nextSegment);
     _outputPath = new Path(_path, name);
     _output = _fileSystem.create(_outputPath, false);
+    _output.write(MAGIC);
+    _output.writeInt(VERSION);
+    _output.sync();
   }
 
   private String buffer(long number) {
@@ -383,20 +401,30 @@ public class HdfsKeyValueStore implements Store {
 
   private void loadIndex(Path path) throws IOException {
     FSDataInputStream inputStream = _fileSystem.open(path);
-    long fileLength = getFileLength(path, inputStream);
-    Operation operation = new Operation();
-    try {
-      while (inputStream.getPos() < fileLength) {
-        try {
-          operation.readFields(inputStream);
-        } catch (IOException e) {
-          // End of sync point found
-          return;
+    byte[] buf = new byte[MAGIC.length];
+    inputStream.readFully(buf);
+    if (!Arrays.equals(MAGIC, buf)) {
+      throw new IOException("File [" + path + "] not a " + BLUR_KEY_VALUE + " file.");
+    }
+    int version = inputStream.readInt();
+    if (version == 1) {
+      long fileLength = getFileLength(path, inputStream);
+      Operation operation = new Operation();
+      try {
+        while (inputStream.getPos() < fileLength) {
+          try {
+            operation.readFields(inputStream);
+          } catch (IOException e) {
+            // End of sync point found
+            return;
+          }
+          loadIndex(path, operation);
         }
-        loadIndex(path, operation);
+      } finally {
+        inputStream.close();
       }
-    } finally {
-      inputStream.close();
+    } else {
+      throw new IOException("Unknown version [" + version + "]");
     }
   }
 
@@ -418,7 +446,7 @@ public class HdfsKeyValueStore implements Store {
     return new BytesRef(key.getBytes(), 0, key.getLength());
   }
 
-  private SortedSet<FileStatus> getList(Path p) throws IOException {
+  private SortedSet<FileStatus> getSortedSet(Path p) throws IOException {
     if (_fileSystem.exists(p)) {
       FileStatus[] listStatus = _fileSystem.listStatus(p);
       if (listStatus != null) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8ca8ecb/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
new file mode 100644
index 0000000..3dc046d
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs_v2;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.util.Version;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FastHdfsKeyValueDirectoryTest {
+
+  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp_HdfsKeyValueStoreTest"));
+
+  private Configuration _configuration = new Configuration();
+  private static MiniDFSCluster _cluster;
+  private Path _path;
+
+  @BeforeClass
+  public static void startCluster() {
+    Configuration conf = new Configuration();
+    _cluster = HdfsKeyValueStoreTest.startDfs(conf, true, TMPDIR.getAbsolutePath());
+  }
+
+  @AfterClass
+  public static void stopCluster() {
+    HdfsKeyValueStoreTest.shutdownDfs(_cluster);
+  }
+
+  @Before
+  public void setup() throws IOException {
+    FileSystem fileSystem = _cluster.getFileSystem();
+    _path = new Path("/test").makeQualified(fileSystem);
+    fileSystem.delete(_path, true);
+  }
+
+  @Test
+  public void testMultipleWritersOpenOnSameDirectory() throws IOException {
+    IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
+    FastHdfsKeyValueDirectory directory = new FastHdfsKeyValueDirectory(_configuration,
+        new Path(_path, "test_multiple"));
+    IndexWriter writer1 = new IndexWriter(directory, config.clone());
+    addDoc(writer1, getDoc(1));
+    IndexWriter writer2 = new IndexWriter(directory, config.clone());
+    addDoc(writer2, getDoc(2));
+    writer1.close();
+    writer2.close();
+
+    DirectoryReader reader = DirectoryReader.open(directory);
+    int maxDoc = reader.maxDoc();
+    assertEquals(1, maxDoc);
+    Document document = reader.document(0);
+    assertEquals("2", document.get("id"));
+    reader.close();
+  }
+
+  private void addDoc(IndexWriter writer, Document doc) throws IOException {
+    writer.addDocument(doc);
+  }
+
+  private Document getDoc(int i) {
+    Document document = new Document();
+    document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
+    return document;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8ca8ecb/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
index b9ca41b..5ca86db 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
@@ -55,7 +55,7 @@ public class HdfsKeyValueStoreTest {
 
   @AfterClass
   public static void stopCluster() {
-    shutdownDfs();
+    shutdownDfs(_cluster);
   }
 
   @Before
@@ -141,7 +141,7 @@ public class HdfsKeyValueStoreTest {
     store.close();
   }
 
-//  @Test
+  // @Test
   public void testTwoKeyStoreInstancesWritingAtTheSameTime() throws IOException {
     HdfsKeyValueStore store1 = new HdfsKeyValueStore(_configuration, _path);
     listFiles();
@@ -214,7 +214,7 @@ public class HdfsKeyValueStoreTest {
     return new BytesRef(s);
   }
 
-  public static void startDfs(Configuration conf, boolean format, String path) {
+  public static MiniDFSCluster startDfs(Configuration conf, boolean format, String path)
{
     String perm;
     Path p = new Path(new File("./target").getAbsolutePath());
     try {
@@ -230,26 +230,27 @@ public class HdfsKeyValueStoreTest {
     conf.set("dfs.datanode.data.dir.perm", perm);
     System.setProperty("test.build.data", path);
     try {
-      _cluster = new MiniDFSCluster(conf, 1, true, (String[]) null);
-      _cluster.waitActive();
+      MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, (String[]) null);
+      cluster.waitActive();
+      return cluster;
     } catch (Exception e) {
       LOG.error("error opening file system", e);
       throw new RuntimeException(e);
     }
   }
 
-  public static void shutdownDfs() {
-    if (_cluster != null) {
+  public static void shutdownDfs(MiniDFSCluster cluster) {
+    if (cluster != null) {
       LOG.info("Shutting down Mini DFS ");
       try {
-        _cluster.shutdown();
+        cluster.shutdown();
       } catch (Exception e) {
         // / Can get a java.lang.reflect.UndeclaredThrowableException thrown
         // here because of an InterruptedException. Don't let exceptions in
         // here be cause of test failure.
       }
       try {
-        FileSystem fs = _cluster.getFileSystem();
+        FileSystem fs = cluster.getFileSystem();
         if (fs != null) {
           LOG.info("Shutting down FileSystem");
           fs.close();


Mime
View raw message