incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Fixing BLUR-375.
Date Tue, 09 Sep 2014 21:20:47 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 77d24fcf9 -> c7a934a9f


Fixing BLUR-375.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/004a5630
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/004a5630
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/004a5630

Branch: refs/heads/master
Commit: 004a56303320a5877eb0273080dd320bf7b28ff0
Parents: 3e3ed54
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Sep 9 17:18:32 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Sep 9 17:18:32 2014 -0400

----------------------------------------------------------------------
 .../mapreduce/lib/GenericBlurRecordWriter.java  |  9 +-
 .../mapreduce/lib/PrimeDocOverFlowHelper.java   | 94 ++++++++++++++++++++
 .../mapreduce/lib/BlurOutputFormatTest.java     | 41 +++++++++
 3 files changed, 141 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/004a5630/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
index 5b7c3ee..0a62e37 100644
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
@@ -46,10 +46,12 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SlowCompositeReaderWrapper;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
@@ -242,7 +244,6 @@ public class GenericBlurRecordWriter {
       // going to be doc 0.
       // Therefore the first document added is the prime doc
       List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
-      docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
       for (List<Field> doc : docs) {
         _localTmpWriter.addDocument(doc);
       }
@@ -279,11 +280,13 @@ public class GenericBlurRecordWriter {
       flushToTmpIndex();
       _localTmpWriter.close(false);
       DirectoryReader reader = DirectoryReader.open(_localTmpDir);
+      AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(reader);
+      AtomicReader primeDocAtomicReader= PrimeDocOverFlowHelper.addPrimeDoc(atomicReader);
       if (_countersSetup) {
         _recordRateCounter.mark(reader.numDocs());
       }
-      _writer.addIndexes(reader);
-      reader.close();
+      _writer.addIndexes(primeDocAtomicReader);
+      primeDocAtomicReader.close();
       resetLocalTmp();
       _writer.maybeMerge();
       if (_countersSetup) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/004a5630/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
new file mode 100644
index 0000000..672a1c1
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+
+import org.apache.blur.utils.BlurConstants;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.FilterAtomicReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.ParallelAtomicReader;
+import org.apache.lucene.index.StoredFieldVisitor;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.Version;
+
+public class PrimeDocOverFlowHelper {
+
+  private static Directory _directory;
+
+  static {
+    try {
+      _directory = new RAMDirectory();
+      IndexWriter writer = new IndexWriter(_directory, new IndexWriterConfig(Version.LUCENE_43,
new KeywordAnalyzer()));
+      Document document = new Document();
+      document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
+      writer.addDocument(document);
+      writer.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static AtomicReader addPrimeDoc(AtomicReader atomicReader) throws IOException {
+    AtomicReaderContext context = DirectoryReader.open(_directory).leaves().get(0);
+    return new ParallelAtomicReader(true, setDocSize(context.reader(), atomicReader.maxDoc()),
atomicReader);
+  }
+
+  private static AtomicReader setDocSize(AtomicReader reader, final int count) {
+    return new FilterAtomicReader(reader) {
+      @Override
+      public Bits getLiveDocs() {
+        return new Bits() {
+          @Override
+          public boolean get(int index) {
+            return true;
+          }
+
+          @Override
+          public int length() {
+            return count;
+          }
+        };
+      }
+
+      @Override
+      public int numDocs() {
+        return count;
+      }
+
+      @Override
+      public int maxDoc() {
+        return count;
+      }
+
+      @Override
+      public void document(int docID, StoredFieldVisitor visitor) throws IOException {
+        // Do nothing
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/004a5630/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index b3875d8..d097e6c 100644
--- a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -19,6 +19,7 @@ package org.apache.blur.mapreduce.lib;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
 import java.io.DataInputStream;
@@ -28,6 +29,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.util.Collection;
+import java.util.List;
 import java.util.TreeSet;
 
 import org.apache.blur.server.TableContext;
@@ -45,7 +47,13 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.DocsEnum;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.util.BytesRef;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -138,6 +146,7 @@ public class BlurOutputFormatTest {
     assertEquals(1, commitedTasks.size());
     DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
     assertEquals(2, reader.numDocs());
+    validatePrimeDocs(reader);
     reader.close();
   }
 
@@ -189,9 +198,39 @@ public class BlurOutputFormatTest {
 
     DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
     assertEquals(80000, reader.numDocs());
+    validatePrimeDocs(reader);
     reader.close();
   }
 
+  private void validatePrimeDocs(DirectoryReader reader) throws IOException {
+    List<AtomicReaderContext> leaves = reader.leaves();
+    for (AtomicReaderContext context : leaves) {
+      AtomicReader atomicReader = context.reader();
+      Terms rowIdTerms = atomicReader.fields().terms("rowid");
+
+      TermsEnum rowIdTermsEnum = rowIdTerms.iterator(null);
+      BytesRef rowId;
+      while ((rowId = rowIdTermsEnum.next()) != null) {
+        DocsEnum rowIdDocsEnum = rowIdTermsEnum.docs(atomicReader.getLiveDocs(), null);
+        int nextDoc = rowIdDocsEnum.nextDoc();
+        checkPrimeDoc(atomicReader, nextDoc, rowId);
+      }
+    }
+  }
+
+  private void checkPrimeDoc(AtomicReader atomicReader, int docId, BytesRef rowId) throws
IOException {
+    Terms primeDocTerms = atomicReader.fields().terms("_prime_");
+    TermsEnum primeDocTermsEnum = primeDocTerms.iterator(null);
+    if (!primeDocTermsEnum.seekExact(new BytesRef("true"), false)) {
+      fail("No Prime Docs...");
+    }
+    DocsEnum primeDocDocsEnum = primeDocTermsEnum.docs(atomicReader.getLiveDocs(), null);
+    int advance;
+    if ((advance = primeDocDocsEnum.advance(docId)) != docId) {
+      fail("FAIL:" + rowId.utf8ToString() + " " + advance + " " + docId);
+    }
+  }
+
   @Test
   public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
       ClassNotFoundException {
@@ -231,6 +270,7 @@ public class BlurOutputFormatTest {
 
       DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
       total += reader.numDocs();
+      validatePrimeDocs(reader);
       reader.close();
     }
     assertEquals(80000, total);
@@ -275,6 +315,7 @@ public class BlurOutputFormatTest {
       for (Path p : commitedTasks) {
         DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, p));
         total += reader.numDocs();
+        validatePrimeDocs(reader);
         reader.close();
       }
     }


Mime
View raw message