incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Working on BLUR-400. I believe I have fixed the data loss problem. The issue was that deletes were not being synced after the commit occured. Therefore there were 2 segment files and the writer and reader were defaulting to the older segme
Date Mon, 22 Dec 2014 15:41:25 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 3c164d62f -> 69e37c3b9


Working on BLUR-400.  I believe I have fixed the data loss problem.  The issue was that deletes
were not being synced after the commit occured.  Therefore there were 2 segment files and
the writer and reader were defaulting to the older segment upon reopen and the new segment
was removed.  Once I have performed some tests I will close BLUR-400.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/69e37c3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/69e37c3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/69e37c3b

Branch: refs/heads/master
Commit: 69e37c3b928a666a9f26038f9cbf0d72974fd810
Parents: 3c164d6
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Dec 22 10:41:21 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Dec 22 10:41:21 2014 -0500

----------------------------------------------------------------------
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      | 24 +++--
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   |  3 +
 .../hdfs_v2/FastHdfsKeyValueDirectoryTest.java  | 93 ++++++++++++++++++++
 3 files changed, 112 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/69e37c3b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
index 0e0bb8c..5f7442a 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -23,9 +23,10 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -64,17 +65,17 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
     BytesRef value = new BytesRef();
     if (_store.get(FILES, value)) {
       String filesString = value.utf8ToString();
+//      System.out.println("Open Files String [" + filesString + "]");
       String[] files = filesString.split("\\" + SEP);
       for (String file : files) {
         if (file.isEmpty()) {
-          continue;
+          throw new IOException("Empty file names should not occur [" + filesString + "]");
         }
         BytesRef key = new BytesRef(file + LENGTH);
         if (_store.get(key, value)) {
           _files.put(file, Long.parseLong(value.utf8ToString()));
         } else {
-          // _files.put(file, 0L);
-          LOG.warn("Missing meta data for file [{0}], setting length to '0'.", file);
+          throw new IOException("Missing meta data for file [" + file + "], setting length
to '0'.");
         }
       }
     }
@@ -128,12 +129,14 @@ public class FastHdfsKeyValueDirectory extends Directory implements
LastModified
 
   private void writeFilesNames() throws IOException {
     StringBuilder builder = new StringBuilder();
-    for (String n : _files.keySet()) {
+    Set<String> fileNames = new TreeSet<String>(_files.keySet());
+    for (String n : fileNames) {
       if (builder.length() != 0) {
         builder.append(SEP);
       }
       builder.append(n);
     }
+//    System.out.println("Writing Files String [" + builder.toString() + "]");
     _store.put(FILES, new BytesRef(builder.toString()));
   }
 
@@ -150,7 +153,6 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
   @Override
   public String[] listAll() throws IOException {
     Set<String> fileNames = new HashSet<String>(_files.keySet());
-    fileNames.remove(null);
     return fileNames.toArray(new String[fileNames.size()]);
   }
 
@@ -172,6 +174,7 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
       for (long l = 0; l <= blocks; l++) {
         _store.delete(new BytesRef(name + "/" + l));
       }
+      writeFileNamesAndSync();
     }
   }
 
@@ -185,13 +188,17 @@ public class FastHdfsKeyValueDirectory extends Directory implements
LastModified
 
   @Override
   public void sync(Collection<String> names) throws IOException {
-    writeFilesNames();
-    _store.sync();
+    writeFileNamesAndSync();
     if (shouldPerformGC()) {
       gc();
     }
   }
 
+  private void writeFileNamesAndSync() throws IOException {
+    writeFilesNames();
+    _store.sync();
+  }
+
   private boolean shouldPerformGC() {
     if (_lastGc + GC_DELAY < System.currentTimeMillis()) {
       return true;
@@ -212,4 +219,5 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
     }
     throw new FileNotFoundException(name);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/69e37c3b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index 81616eb..afbe6d3 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -507,6 +507,8 @@ public class HdfsKeyValueStore implements Store {
     _output.flush();
     _output.sync();
     _lastWrite.set(System.currentTimeMillis());
+    // System.out.println("Sync Output Path [" + _outputPath + "] Position [" +
+    // _output.getPos() + "]");
   }
 
   private void loadIndex(Path path) throws IOException {
@@ -519,6 +521,7 @@ public class HdfsKeyValueStore implements Store {
     int version = inputStream.readInt();
     if (version == 1) {
       long fileLength = getFileLength(path, inputStream);
+//      System.out.println("Load Index File [" + path + "] Length [" + fileLength + "]");
       Operation operation = new Operation();
       try {
         while (inputStream.getPos() < fileLength) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/69e37c3b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
index 26d75a4..a1c403b 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
@@ -20,7 +20,12 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
 import java.util.Timer;
+import java.util.TreeSet;
 
 import org.apache.blur.HdfsMiniClusterUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -32,8 +37,12 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SerialMergeScheduler;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.Version;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -91,6 +100,90 @@ public class FastHdfsKeyValueDirectoryTest {
     reader.close();
   }
 
+  @Test
+  public void testMulipleCommitsAndReopens() throws IOException {
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
+    conf.setMergeScheduler(new SerialMergeScheduler());
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+
+    Set<String> fileSet = new TreeSet<String>();
+    long seed = new Random().nextLong();
+    System.out.println("Seed:" + seed);
+    Random random = new Random(seed);
+    int docCount = 0;
+    int passes = 100;
+    for (int run = 0; run < passes; run++) {
+      final FastHdfsKeyValueDirectory directory = new FastHdfsKeyValueDirectory(_timer, _configuration,
new Path(_path,
+          "test_multiple_commits_reopens"));
+      assertFiles(fileSet, run, -1, directory);
+      assertEquals(docCount, getDocumentCount(directory));
+      IndexWriter writer = new IndexWriter(directory, conf.clone());
+      int numberOfCommits = random.nextInt(100);
+      for (int i = 0; i < numberOfCommits; i++) {
+        assertFiles(fileSet, run, i, directory);
+        addDocuments(writer, random.nextInt(100));
+        // System.out.println("Before Commit");
+        writer.commit();
+        // System.out.println("After Commit");
+
+        fileSet.clear();
+        List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
+        assertEquals(1, listCommits.size());
+        IndexCommit indexCommit = listCommits.get(0);
+        fileSet.addAll(indexCommit.getFileNames());
+        // System.out.println("Files after commit " + fileSet);
+      }
+      docCount = getDocumentCount(directory);
+    }
+  }
+
+  private int getDocumentCount(Directory directory) throws IOException {
+    if (DirectoryReader.indexExists(directory)) {
+      DirectoryReader reader = DirectoryReader.open(directory);
+      int maxDoc = reader.maxDoc();
+      reader.close();
+      return maxDoc;
+    }
+    return 0;
+  }
+
+  private void assertFiles(Set<String> expected, int run, int commit, FastHdfsKeyValueDirectory
directory)
+      throws IOException {
+    Set<String> actual;
+    if (DirectoryReader.indexExists(directory)) {
+      List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
+      assertEquals(1, listCommits.size());
+      IndexCommit indexCommit = listCommits.get(0);
+      actual = new TreeSet<String>(indexCommit.getFileNames());
+    } else {
+      actual = new TreeSet<String>();
+    }
+
+    Set<String> missing = new TreeSet<String>(expected);
+    missing.removeAll(actual);
+    Set<String> extra = new TreeSet<String>(actual);
+    extra.removeAll(expected);
+    // System.out.println("Segment Files [" + getSegmentFiles(actual) + "]");
+    assertEquals("Pass [" + run + "] Missing Files " + " Extra Files " + extra + "", expected,
actual);
+  }
+
+  private Set<String> getSegmentFiles(Set<String> actual) {
+    Set<String> result = new HashSet<String>();
+    for (String s : actual) {
+      if (s.startsWith("segments_")) {
+        result.add(s);
+      }
+    }
+    return result;
+  }
+
+  private void addDocuments(IndexWriter writer, int numberOfDocs) throws IOException {
+    for (int i = 0; i < numberOfDocs; i++) {
+      addDoc(writer, getDoc(i));
+    }
+  }
+
   private void addDoc(IndexWriter writer, Document doc) throws IOException {
     writer.addDocument(doc);
   }


Mime
View raw message