incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Added in overflow indexing BLUR-80 and finished the bluroutputformat BLUR-83.
Date Fri, 17 May 2013 19:08:16 GMT
Updated Branches:
  refs/heads/0.1.5 5d49585c9 -> 7282e7a26


Added in overflow indexing BLUR-80 and finished the bluroutputformat BLUR-83.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/7282e7a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/7282e7a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/7282e7a2

Branch: refs/heads/0.1.5
Commit: 7282e7a260fa264aa602b7a157cfc88e69140852
Parents: 5d49585
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri May 17 15:07:28 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri May 17 15:07:28 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/mapreduce/lib/BlurCounters.java    |    2 +-
 .../blur/mapreduce/lib/BlurOutputFormat.java       |  107 +++++++++++++--
 .../blur/mapreduce/lib/BlurOutputFormatTest.java   |   78 ++++++++++-
 3 files changed, 163 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7282e7a2/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
index efcd645..ba8da67 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
@@ -21,6 +21,6 @@ package org.apache.blur.mapreduce.lib;
  * The enum class used for all the internal counters during map reduce jobs.
  */
 public enum BlurCounters {
-  RECORD_COUNT, FIELD_COUNT, ROW_COUNT, RECORD_RATE, COPY_RATE, ROW_RATE
+  RECORD_COUNT, FIELD_COUNT, ROW_COUNT, RECORD_RATE, COPY_RATE, ROW_RATE, RECORD_DUPLICATE_COUNT,
ROW_OVERFLOW_COUNT
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7282e7a2/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index 141cee7..77ee8bc 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -20,8 +20,8 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 
 import org.apache.blur.analysis.BlurAnalyzer;
@@ -52,8 +52,10 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.IOContext;
@@ -64,6 +66,7 @@ import org.apache.thrift.transport.TIOStreamTransport;
 
 public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> {
 
+  public static final String BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE = "blur.output.max.document.buffer.size";
   public static final String BLUR_TABLE_DESCRIPTOR = "blur.table.descriptor";
   public static final String BLUR_OUTPUT_PATH = "blur.output.path";
 
@@ -153,6 +156,18 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     setOutputPath(job, new Path(tableDescriptor.getTableUri()));
   }
 
+  public static void setMaxDocumentBufferSize(Job job, int maxDocumentBufferSize) {
+    setMaxDocumentBufferSize(job.getConfiguration(), maxDocumentBufferSize);
+  }
+
+  public static void setMaxDocumentBufferSize(Configuration configuration, int maxDocumentBufferSize)
{
+    configuration.setInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, maxDocumentBufferSize);
+  }
+
+  public static int getMaxDocumentBufferSize(Configuration configuration) {
+    return configuration.getInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, 1000);
+  }
+
   public static void setOutputPath(Job job, Path path) {
     Configuration configuration = job.getConfiguration();
     configuration.set(BLUR_OUTPUT_PATH, path.toString());
@@ -168,23 +183,35 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     private static final Log LOG = LogFactory.getLog(BlurRecordWriter.class);
 
     private final Text _prevKey = new Text();
-    private final List<Document> _documents = new ArrayList<Document>();
+    private final Map<String, Document> _documents = new TreeMap<String, Document>();
     private final IndexWriter _writer;
     private final BlurAnalyzer _analyzer;
     private final StringBuilder _builder = new StringBuilder();
     private final Directory _finalDir;
     private final Directory _localDir;
     private final File _localPath;
+    private final int _maxDocumentBufferSize;
+    private final IndexWriterConfig _conf;
     private Counter _fieldCount;
     private Counter _recordCount;
     private Counter _rowCount;
+    private Counter _recordDuplicateCount;
     private boolean _countersSetup = false;
     private RateCounter _recordRateCounter;
     private RateCounter _rowRateCounter;
     private RateCounter _copyRateCounter;
+    private IndexWriter _localTmpWriter;
+    private boolean _usingLocalTmpindex;
+
+    private File _localTmpPath;
+
+    private ProgressableDirectory _localTmpDir;
+
+    private Counter _rowOverFlowCount;
 
     public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int shardId,
String tmpDirName)
         throws IOException {
+      _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(configuration);
       Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
       String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
       Path indexPath = new Path(tableOutput, shardName);
@@ -195,12 +222,14 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
 
       TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
       _analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzerDefinition());
-      IndexWriterConfig conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION,
_analyzer);
+      _conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, _analyzer);
+      TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
+      mergePolicy.setUseCompoundFile(false);
 
       String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
       _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
       _localDir = new ProgressableDirectory(FSDirectory.open(_localPath), BlurOutputFormat.getProgressable());
-      _writer = new IndexWriter(_localDir, conf);
+      _writer = new IndexWriter(_localDir, _conf.clone());
     }
 
     @Override
@@ -220,22 +249,59 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
       GetCounter getCounter = BlurOutputFormat.getGetCounter();
       _fieldCount = getCounter.getCounter(BlurCounters.FIELD_COUNT);
       _recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
+      _recordDuplicateCount = getCounter.getCounter(BlurCounters.RECORD_DUPLICATE_COUNT);
       _rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
+      _rowOverFlowCount = getCounter.getCounter(BlurCounters.ROW_OVERFLOW_COUNT);
       _recordRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.RECORD_RATE));
       _rowRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.ROW_RATE));
       _copyRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.COPY_RATE));
     }
 
-    private void add(BlurMutate value) {
+    private void add(BlurMutate value) throws IOException {
       BlurRecord blurRecord = value.getRecord();
       Record record = getRecord(blurRecord);
+      String recordId = record.getRecordId();
       Document document = TransactionRecorder.convert(blurRecord.getRowId(), record, _builder,
_analyzer);
       if (_documents.size() == 0) {
         document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
       }
-      _documents.add(document);
-      _fieldCount.increment(document.getFields().size());
-      _recordCount.increment(1);
+      Document dup = _documents.put(recordId, document);
+      if (dup != null) {
+        _recordDuplicateCount.increment(1);
+      } else {
+        _fieldCount.increment(document.getFields().size());
+        _recordCount.increment(1);
+      }
+      flushToTmpIndexIfNeeded();
+    }
+
+    private void flushToTmpIndexIfNeeded() throws IOException {
+      if (_documents.size() > _maxDocumentBufferSize) {
+        flushToTmpIndex();
+      }
+    }
+
+    private void flushToTmpIndex() throws IOException {
+      if (_documents.isEmpty()) {
+        return;
+      }
+      _usingLocalTmpindex = true;
+      if (_localTmpWriter == null) {
+        String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+        _localTmpPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+        _localTmpDir = new ProgressableDirectory(FSDirectory.open(_localTmpPath), BlurOutputFormat.getProgressable());
+        _localTmpWriter = new IndexWriter(_localTmpDir, _conf.clone());
+      }
+      _localTmpWriter.addDocuments(_documents.values());
+      _documents.clear();
+    }
+
+    private void resetLocalTmp() {
+      _usingLocalTmpindex = false;
+      _localTmpWriter = null;
+      _localTmpDir = null;
+      rm(_localTmpPath);
+      _localTmpPath = null;
     }
 
     private Record getRecord(BlurRecord value) {
@@ -249,14 +315,25 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     }
 
     private void flush() throws CorruptIndexException, IOException {
-      if (_documents.isEmpty()) {
-        return;
+      if (_usingLocalTmpindex) {
+        flushToTmpIndex();
+        _localTmpWriter.close(false);
+        DirectoryReader reader = DirectoryReader.open(_localTmpDir);
+        _recordRateCounter.mark(reader.numDocs());
+        _writer.addIndexes(reader);
+        reader.close();
+        resetLocalTmp();
+        _rowOverFlowCount.increment(1);
+      } else {
+        if (_documents.isEmpty()) {
+          return;
+        }
+        _writer.addDocuments(_documents.values());
+        _recordRateCounter.mark(_documents.size());
+        _documents.clear();
       }
-      _writer.addDocuments(_documents);
-      _rowCount.increment(1);
-      _recordRateCounter.mark(_documents.size());
       _rowRateCounter.mark();
-      _documents.clear();
+      _rowCount.increment(1);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7282e7a2/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index 1048eef..56186fe 100644
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -16,6 +16,7 @@ package org.apache.blur.mapreduce.lib;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedReader;
@@ -23,9 +24,13 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.PrintWriter;
 
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.AnalyzerDefinition;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -35,6 +40,7 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TestMapReduceLocal.TrackingTextInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.lucene.index.DirectoryReader;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -57,6 +63,7 @@ public class BlurOutputFormatTest {
       throw new RuntimeException("problem getting local fs", io);
     }
     mr = new MiniMRCluster(2, "file:///", 3);
+    BufferStore.init(128, 128);
   }
 
   @AfterClass
@@ -70,9 +77,9 @@ public class BlurOutputFormatTest {
   public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException
{
     localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
     localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-    writeFile("in/part1", "1,1,cf1,val1");
-    writeFile("in/part2", "1,2,cf1,val2");
-    
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
     Job job = new Job(conf, "blur index");
     job.setJarByClass(BlurOutputFormatTest.class);
     job.setMapperClass(CsvBlurMapper.class);
@@ -84,17 +91,61 @@ public class BlurOutputFormatTest {
     job.setOutputFormatClass(BlurOutputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
-    
+
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(1);
     tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/out").toString());
+    tableDescriptor.setTableUri(tableUri);
     BlurOutputFormat.setTableDescriptor(job, tableDescriptor);
 
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
+
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, new Path(new Path(tableUri,
BlurUtil
+        .getShardName(0)), "attempt_local_0001_r_000000_0.commit")));
+    assertEquals(2, reader.numDocs());
+    reader.close();
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException,
ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = new Job(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setReducerClass(DefaultBlurReducer.class);
+    job.setNumReduceTasks(1);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(BlurMutate.class);
+    job.setOutputFormatClass(BlurOutputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(tableUri);
+    BlurOutputFormat.setTableDescriptor(job, tableDescriptor);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, new Path(new Path(tableUri,
BlurUtil
+        .getShardName(0)), "attempt_local_0002_r_000000_0.commit")));
+    assertEquals(80000, reader.numDocs());
+    reader.close();
   }
 
   public static String readFile(String name) throws IOException {
@@ -111,12 +162,23 @@ public class BlurOutputFormatTest {
     return result.toString();
   }
 
-  public static Path writeFile(String name, String data) throws IOException {
+  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
+      int numberOfRecords, String family) throws IOException {
+    // "1,1,cf1,val1"
     Path file = new Path(TEST_ROOT_DIR + "/" + name);
     localFs.delete(file, false);
     DataOutputStream f = localFs.create(file);
-    f.write(data.getBytes());
-    f.close();
+    PrintWriter writer = new PrintWriter(f);
+    for (int row = 0; row < numberOfRows; row++) {
+      for (int record = 0; record < numberOfRecords; record++) {
+        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
+      }
+    }
+    writer.close();
     return file;
   }
+
+  private String getRecord(int rowId, int recordId, String family) {
+    return rowId + "," + recordId + "," + family + ",valuetoindex";
+  }
 }


Mime
View raw message