incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twilli...@apache.org
Subject git commit: bring over the primedoc overflow fix from hadoop1 libs
Date Fri, 23 Jan 2015 12:19:17 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 5dfe14be2 -> fad57102e


bring over the primedoc overflow fix from hadoop1 libs


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/fad57102
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/fad57102
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/fad57102

Branch: refs/heads/master
Commit: fad57102e7318d6ba23f527d376f2e6b611fc87e
Parents: 5dfe14b
Author: twilliams <twilliams@apache.org>
Authored: Fri Jan 23 07:14:45 2015 -0500
Committer: twilliams <twilliams@apache.org>
Committed: Fri Jan 23 07:14:45 2015 -0500

----------------------------------------------------------------------
 .../mapreduce/lib/GenericBlurRecordWriter.java  | 579 ++++++++++---------
 .../mapreduce/lib/PrimeDocOverFlowHelper.java   |  94 +++
 2 files changed, 385 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/fad57102/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
index 1211988..5779d47 100644
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
@@ -46,10 +46,12 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SlowCompositeReaderWrapper;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
@@ -58,317 +60,318 @@ import org.apache.lucene.store.SimpleFSDirectory;
 
 public class GenericBlurRecordWriter {
 
-  private static final Log LOG = LogFactory.getLog(GenericBlurRecordWriter.class);
-  private static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
-  private static final Counter NULL_COUNTER = new NullCounter();
+	  private static final Log LOG = LogFactory.getLog(GenericBlurRecordWriter.class);
+	  private static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
+	  private static final Counter NULL_COUNTER = new NullCounter();
 
-  private final Text _prevKey = new Text();
+	  private final Text _prevKey = new Text();
 
-  private final IndexWriter _writer;
-  private final FieldManager _fieldManager;
-  private final Directory _finalDir;
-  private final Directory _localDir;
-  private final File _localPath;
-  // private final int _maxDocumentBufferSize;
-  private final IndexWriterConfig _conf;
-  private final IndexWriterConfig _overFlowConf;
-  private final Path _newIndex;
-  private final boolean _indexLocally;
-  private final boolean _optimizeInFlight;
-  private final DocumentBufferStrategy _documentBufferStrategy;
-  private Counter _columnCount = NULL_COUNTER;
-  private Counter _fieldCount = NULL_COUNTER;
-  private Counter _recordCount = NULL_COUNTER;
-  private Counter _rowCount = NULL_COUNTER;
-  private Counter _recordDuplicateCount = NULL_COUNTER;
-  private Counter _rowOverFlowCount = NULL_COUNTER;
-  private Counter _rowDeleteCount = NULL_COUNTER;
-  private RateCounter _recordRateCounter = new RateCounter(NULL_COUNTER);
-  private RateCounter _rowRateCounter = new RateCounter(NULL_COUNTER);
-  private RateCounter _copyRateCounter = new RateCounter(NULL_COUNTER);
-  private boolean _countersSetup = false;
-  private IndexWriter _localTmpWriter;
-  private boolean _usingLocalTmpindex;
-  private File _localTmpPath;
-  private ProgressableDirectory _localTmpDir;
-  private String _deletedRowId;
-  private Configuration _configuration;
+	  private final IndexWriter _writer;
+	  private final FieldManager _fieldManager;
+	  private final Directory _finalDir;
+	  private final Directory _localDir;
+	  private final File _localPath;
+	  // private final int _maxDocumentBufferSize;
+	  private final IndexWriterConfig _conf;
+	  private final IndexWriterConfig _overFlowConf;
+	  private final Path _newIndex;
+	  private final boolean _indexLocally;
+	  private final boolean _optimizeInFlight;
+	  private final DocumentBufferStrategy _documentBufferStrategy;
+	  private Counter _columnCount = NULL_COUNTER;
+	  private Counter _fieldCount = NULL_COUNTER;
+	  private Counter _recordCount = NULL_COUNTER;
+	  private Counter _rowCount = NULL_COUNTER;
+	  private Counter _recordDuplicateCount = NULL_COUNTER;
+	  private Counter _rowOverFlowCount = NULL_COUNTER;
+	  private Counter _rowDeleteCount = NULL_COUNTER;
+	  private RateCounter _recordRateCounter = new RateCounter(NULL_COUNTER);
+	  private RateCounter _rowRateCounter = new RateCounter(NULL_COUNTER);
+	  private RateCounter _copyRateCounter = new RateCounter(NULL_COUNTER);
+	  private boolean _countersSetup = false;
+	  private IndexWriter _localTmpWriter;
+	  private boolean _usingLocalTmpindex;
+	  private File _localTmpPath;
+	  private ProgressableDirectory _localTmpDir;
+	  private String _deletedRowId;
+	  private Configuration _configuration;
 
-  public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName)
throws IOException {
-    _configuration = configuration;
-    _documentBufferStrategy = BlurOutputFormat.getDocumentBufferStrategy(_configuration);
-    _indexLocally = BlurOutputFormat.isIndexLocally(_configuration);
-    _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(_configuration);
+	  public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName)
throws IOException {
+	    _configuration = configuration;
+	    _documentBufferStrategy = BlurOutputFormat.getDocumentBufferStrategy(_configuration);
+	    _indexLocally = BlurOutputFormat.isIndexLocally(_configuration);
+	    _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(_configuration);
 
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
-    int shardCount = tableDescriptor.getShardCount();
-    int shardId = attemptId % shardCount;
+	    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+	    int shardCount = tableDescriptor.getShardCount();
+	    int shardId = attemptId % shardCount;
 
-    Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
-    String shardName = ShardUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
-    Path indexPath = new Path(tableOutput, shardName);
-    _newIndex = new Path(indexPath, tmpDirName);
-    _finalDir = new ProgressableDirectory(new HdfsDirectory(_configuration, _newIndex), getProgressable());
-    _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
+	    Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
+	    String shardName = ShardUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+	    Path indexPath = new Path(tableOutput, shardName);
+	    _newIndex = new Path(indexPath, tmpDirName);
+	    _finalDir = new ProgressableDirectory(new HdfsDirectory(_configuration, _newIndex),
getProgressable());
+	    _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
 
-    TableContext tableContext = TableContext.create(tableDescriptor);
-    _fieldManager = tableContext.getFieldManager();
-    Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
+	    TableContext tableContext = TableContext.create(tableDescriptor);
+	    _fieldManager = tableContext.getFieldManager();
+	    Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
 
-    _conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, analyzer);
-    _conf.setCodec(new Blur024Codec(tableContext.getBlurConfiguration()));
-    _conf.setSimilarity(tableContext.getSimilarity());
-    TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
-    mergePolicy.setUseCompoundFile(false);
+	    _conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, analyzer);
+	    _conf.setCodec(new Blur024Codec());
+	    _conf.setSimilarity(tableContext.getSimilarity());
+	    TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
+	    mergePolicy.setUseCompoundFile(false);
 
-    _overFlowConf = _conf.clone();
+	    _overFlowConf = _conf.clone();
 
-    if (_indexLocally) {
-      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
-      _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
-      SimpleFSDirectory directory = new SimpleFSDirectory(_localPath);
-      _localDir = new ProgressableDirectory(directory, getProgressable());
-      _writer = new IndexWriter(_localDir, _conf.clone());
-    } else {
-      _localPath = null;
-      _localDir = null;
-      _writer = new IndexWriter(_finalDir, _conf.clone());
-    }
-  }
+	    if (_indexLocally) {
+	      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+	      _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+	      SimpleFSDirectory directory = new SimpleFSDirectory(_localPath);
+	      _localDir = new ProgressableDirectory(directory, getProgressable());
+	      _writer = new IndexWriter(_localDir, _conf.clone());
+	    } else {
+	      _localPath = null;
+	      _localDir = null;
+	      _writer = new IndexWriter(_finalDir, _conf.clone());
+	    }
+	  }
 
-  private Progressable getProgressable() {
-    final Progressable prg = BlurOutputFormat.getProgressable();
-    return new Progressable() {
+	  private Progressable getProgressable() {
+	    final Progressable prg = BlurOutputFormat.getProgressable();
+	    return new Progressable() {
 
-      private Progressable _progressable = prg;
-      private long _lastWarn = 0;
-      private boolean _progressSetupLogged = false;
+	      private Progressable _progressable = prg;
+	      private long _lastWarn = 0;
+	      private boolean _progressSetupLogged = false;
 
-      @Override
-      public void progress() {
-        if (_progressable != null) {
-          _progressable.progress();
-          if (!_progressSetupLogged) {
-            LOG.info("Progress has been setup correctly.");
-            _progressSetupLogged = true;
-          }
-        } else {
-          Progressable progressable = BlurOutputFormat.getProgressable();
-          if (progressable != null) {
-            _progressable = progressable;
-          } else {
-            long now = System.nanoTime();
-            if (_lastWarn + TimeUnit.SECONDS.toNanos(10) < now) {
-              LOG.warn("Progress not being reported.");
-              _lastWarn = System.nanoTime();
-            }
-          }
-        }
-      }
-    };
-  }
+	      @Override
+	      public void progress() {
+	        if (_progressable != null) {
+	          _progressable.progress();
+	          if (!_progressSetupLogged) {
+	            LOG.info("Progress has been setup correctly.");
+	            _progressSetupLogged = true;
+	          }
+	        } else {
+	          Progressable progressable = BlurOutputFormat.getProgressable();
+	          if (progressable != null) {
+	            _progressable = progressable;
+	          } else {
+	            long now = System.nanoTime();
+	            if (_lastWarn + TimeUnit.SECONDS.toNanos(10) < now) {
+	              LOG.warn("Progress not being reported.");
+	              _lastWarn = System.nanoTime();
+	            }
+	          }
+	        }
+	      }
+	    };
+	  }
 
-  public void write(Text key, BlurMutate value) throws IOException {
-    if (!_countersSetup) {
-      setupCounter();
-      _countersSetup = true;
-    }
-    if (!_prevKey.equals(key)) {
-      flush();
-      _prevKey.set(key);
-    }
-    add(value);
-  }
+	  public void write(Text key, BlurMutate value) throws IOException {
+	    if (!_countersSetup) {
+	      setupCounter();
+	      _countersSetup = true;
+	    }
+	    if (!_prevKey.equals(key)) {
+	      flush();
+	      _prevKey.set(key);
+	    }
+	    add(value);
+	  }
 
-  private void setupCounter() {
-    GetCounter getCounter = BlurOutputFormat.getGetCounter();
-    if (getCounter != null) {
-      _fieldCount = getCounter.getCounter(BlurCounters.LUCENE_FIELD_COUNT);
-      _columnCount = getCounter.getCounter(BlurCounters.COLUMN_COUNT);
-      _recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
-      _recordDuplicateCount = getCounter.getCounter(BlurCounters.RECORD_DUPLICATE_COUNT);
-      _rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
-      _rowDeleteCount = getCounter.getCounter(BlurCounters.ROW_DELETE_COUNT);
-      _rowOverFlowCount = getCounter.getCounter(BlurCounters.ROW_OVERFLOW_COUNT);
-      _recordRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.RECORD_RATE));
-      _rowRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.ROW_RATE));
-      _copyRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.COPY_RATE));
-    }
-  }
+	  private void setupCounter() {
+	    GetCounter getCounter = BlurOutputFormat.getGetCounter();
+	    if (getCounter != null) {
+	      _fieldCount = getCounter.getCounter(BlurCounters.LUCENE_FIELD_COUNT);
+	      _columnCount = getCounter.getCounter(BlurCounters.COLUMN_COUNT);
+	      _recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
+	      _recordDuplicateCount = getCounter.getCounter(BlurCounters.RECORD_DUPLICATE_COUNT);
+	      _rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
+	      _rowDeleteCount = getCounter.getCounter(BlurCounters.ROW_DELETE_COUNT);
+	      _rowOverFlowCount = getCounter.getCounter(BlurCounters.ROW_OVERFLOW_COUNT);
+	      _recordRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.RECORD_RATE));
+	      _rowRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.ROW_RATE));
+	      _copyRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.COPY_RATE));
+	    }
+	  }
 
-  private void add(BlurMutate value) throws IOException {
-    BlurRecord blurRecord = value.getRecord();
-    Record record = getRecord(blurRecord);
-    String recordId = record.getRecordId();
-    if (value.getMutateType() == MUTATE_TYPE.DELETE) {
-      _deletedRowId = blurRecord.getRowId();
-      return;
-    }
-    if (_countersSetup) {
-      _columnCount.increment(record.getColumns().size());
-    }
-    List<Field> document = RowDocumentUtil.getDoc(_fieldManager, blurRecord.getRowId(),
record);
-    List<Field> dup = _documentBufferStrategy.add(recordId, document);
-    if (_countersSetup) {
-      if (dup != null) {
-        _recordDuplicateCount.increment(1);
-      } else {
-        _fieldCount.increment(document.size());
-        _recordCount.increment(1);
-      }
-    }
-    flushToTmpIndexIfNeeded();
-  }
+	  private void add(BlurMutate value) throws IOException {
+	    BlurRecord blurRecord = value.getRecord();
+	    Record record = getRecord(blurRecord);
+	    String recordId = record.getRecordId();
+	    if (value.getMutateType() == MUTATE_TYPE.DELETE) {
+	      _deletedRowId = blurRecord.getRowId();
+	      return;
+	    }
+	    if (_countersSetup) {
+	      _columnCount.increment(record.getColumns().size());
+	    }
+	    List<Field> document = RowDocumentUtil.getDoc(_fieldManager, blurRecord.getRowId(),
record);
+	    List<Field> dup = _documentBufferStrategy.add(recordId, document);
+	    if (_countersSetup) {
+	      if (dup != null) {
+	        _recordDuplicateCount.increment(1);
+	      } else {
+	        _fieldCount.increment(document.size());
+	        _recordCount.increment(1);
+	      }
+	    }
+	    flushToTmpIndexIfNeeded();
+	  }
 
-  private void flushToTmpIndexIfNeeded() throws IOException {
-    if (_documentBufferStrategy.isFull()) {
-      LOG.info("Document Buffer is full overflow to disk.");
-      flushToTmpIndex();
-    }
-  }
+	  private void flushToTmpIndexIfNeeded() throws IOException {
+	    if (_documentBufferStrategy.isFull()) {
+	      LOG.info("Document Buffer is full overflow to disk.");
+	      flushToTmpIndex();
+	    }
+	  }
 
-  private void flushToTmpIndex() throws IOException {
-    if (_documentBufferStrategy.isEmpty()) {
-      return;
-    }
-    _usingLocalTmpindex = true;
-    if (_localTmpWriter == null) {
-      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
-      _localTmpPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
-      SimpleFSDirectory directory = new SimpleFSDirectory(_localTmpPath);
-      _localTmpDir = new ProgressableDirectory(directory, getProgressable());
-      _localTmpWriter = new IndexWriter(_localTmpDir, _overFlowConf.clone());
-      // The local tmp writer has merging disabled so the first document in is
-      // going to be doc 0.
-      // Therefore the first document added is the prime doc
-      List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
-      docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
-      for (List<Field> doc : docs) {
-        _localTmpWriter.addDocument(doc);
-      }
-    } else {
-      List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
-      for (List<Field> doc : docs) {
-        _localTmpWriter.addDocument(doc);
-      }
-    }
-  }
+	  private void flushToTmpIndex() throws IOException {
+	    if (_documentBufferStrategy.isEmpty()) {
+	      return;
+	    }
+	    _usingLocalTmpindex = true;
+	    if (_localTmpWriter == null) {
+	      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+	      _localTmpPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+	      SimpleFSDirectory directory = new SimpleFSDirectory(_localTmpPath);
+	      _localTmpDir = new ProgressableDirectory(directory, getProgressable());
+	      _localTmpWriter = new IndexWriter(_localTmpDir, _overFlowConf.clone());
+	      // The local tmp writer has merging disabled so the first document in is
+	      // going to be doc 0.
+	      // Therefore the first document added is the prime doc
+	      List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
+	      for (List<Field> doc : docs) {
+	        _localTmpWriter.addDocument(doc);
+	      }
+	    } else {
+	      List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
+	      for (List<Field> doc : docs) {
+	        _localTmpWriter.addDocument(doc);
+	      }
+	    }
+	  }
 
-  private void resetLocalTmp() {
-    _usingLocalTmpindex = false;
-    _localTmpWriter = null;
-    _localTmpDir = null;
-    rm(_localTmpPath);
-    _localTmpPath = null;
-  }
+	  private void resetLocalTmp() {
+	    _usingLocalTmpindex = false;
+	    _localTmpWriter = null;
+	    _localTmpDir = null;
+	    rm(_localTmpPath);
+	    _localTmpPath = null;
+	  }
 
-  private Record getRecord(BlurRecord value) {
-    Record record = new Record();
-    record.setRecordId(value.getRecordId());
-    record.setFamily(value.getFamily());
-    for (BlurColumn col : value.getColumns()) {
-      record.addToColumns(new Column(col.getName(), col.getValue()));
-    }
-    return record;
-  }
+	  public static Record getRecord(BlurRecord value) {
+	    Record record = new Record();
+	    record.setRecordId(value.getRecordId());
+	    record.setFamily(value.getFamily());
+	    for (BlurColumn col : value.getColumns()) {
+	      record.addToColumns(new Column(col.getName(), col.getValue()));
+	    }
+	    return record;
+	  }
 
-  private void flush() throws CorruptIndexException, IOException {
-    if (_usingLocalTmpindex) {
-      // since we have flushed to disk then we do not need to index the
-      // delete.
-      flushToTmpIndex();
-      _localTmpWriter.close(false);
-      DirectoryReader reader = DirectoryReader.open(_localTmpDir);
-      if (_countersSetup) {
-        _recordRateCounter.mark(reader.numDocs());
-      }
-      _writer.addIndexes(reader);
-      reader.close();
-      resetLocalTmp();
-      _writer.maybeMerge();
-      if (_countersSetup) {
-        _rowOverFlowCount.increment(1);
-      }
-    } else {
-      if (_documentBufferStrategy.isEmpty()) {
-        if (_deletedRowId != null) {
-          _writer.addDocument(getDeleteDoc());
-          if (_countersSetup) {
-            _rowDeleteCount.increment(1);
-          }
-        }
-      } else {
-        List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
-        docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
-        _writer.addDocuments(docs);
-        if (_countersSetup) {
-          _recordRateCounter.mark(docs.size());
-        }
-      }
-    }
-    _deletedRowId = null;
-    if (_countersSetup) {
-      _rowRateCounter.mark();
-      _rowCount.increment(1);
-    }
-  }
+	  private void flush() throws CorruptIndexException, IOException {
+	    if (_usingLocalTmpindex) {
+	      // since we have flushed to disk then we do not need to index the
+	      // delete.
+	      flushToTmpIndex();
+	      _localTmpWriter.close(false);
+	      DirectoryReader reader = DirectoryReader.open(_localTmpDir);
+	      AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(reader);
+	      AtomicReader primeDocAtomicReader= PrimeDocOverFlowHelper.addPrimeDoc(atomicReader);
+	      if (_countersSetup) {
+	        _recordRateCounter.mark(reader.numDocs());
+	      }
+	      _writer.addIndexes(primeDocAtomicReader);
+	      primeDocAtomicReader.close();
+	      resetLocalTmp();
+	      _writer.maybeMerge();
+	      if (_countersSetup) {
+	        _rowOverFlowCount.increment(1);
+	      }
+	    } else {
+	      if (_documentBufferStrategy.isEmpty()) {
+	        if (_deletedRowId != null) {
+	          _writer.addDocument(getDeleteDoc());
+	          if (_countersSetup) {
+	            _rowDeleteCount.increment(1);
+	          }
+	        }
+	      } else {
+	        List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
+	        docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
+	        _writer.addDocuments(docs);
+	        if (_countersSetup) {
+	          _recordRateCounter.mark(docs.size());
+	        }
+	      }
+	    }
+	    _deletedRowId = null;
+	    if (_countersSetup) {
+	      _rowRateCounter.mark();
+	      _rowCount.increment(1);
+	    }
+	  }
 
-  private Document getDeleteDoc() {
-    Document document = new Document();
-    document.add(new StringField(BlurConstants.ROW_ID, _deletedRowId, Store.NO));
-    document.add(new StringField(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE,
Store.NO));
-    return document;
-  }
+	  private Document getDeleteDoc() {
+	    Document document = new Document();
+	    document.add(new StringField(BlurConstants.ROW_ID, _deletedRowId, Store.NO));
+	    document.add(new StringField(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE,
Store.NO));
+	    return document;
+	  }
 
-  public void close() throws IOException {
-    flush();
-    _writer.close();
-    if (_countersSetup) {
-      _recordRateCounter.close();
-      _rowRateCounter.close();
-    }
-    if (_indexLocally) {
-      if (_optimizeInFlight) {
-        copyAndOptimizeInFlightDir();
-      } else {
-        copyDir();
-      }
-    }
-    if (_countersSetup) {
-      _copyRateCounter.close();
-    }
-  }
+	  public void close() throws IOException {
+	    flush();
+	    _writer.close();
+	    if (_countersSetup) {
+	      _recordRateCounter.close();
+	      _rowRateCounter.close();
+	    }
+	    if (_indexLocally) {
+	      if (_optimizeInFlight) {
+	        copyAndOptimizeInFlightDir();
+	      } else {
+	        copyDir();
+	      }
+	    }
+	    if (_countersSetup) {
+	      _copyRateCounter.close();
+	    }
+	  }
 
-  private void copyAndOptimizeInFlightDir() throws IOException {
-    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
-    copyRateDirectory.setLockFactory(NoLockFactory.getNoLockFactory());
-    DirectoryReader reader = DirectoryReader.open(_localDir);
-    IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
-    writer.addIndexes(reader);
-    writer.close();
-    rm(_localPath);
-  }
+	  private void copyAndOptimizeInFlightDir() throws IOException {
+	    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+	    copyRateDirectory.setLockFactory(NoLockFactory.getNoLockFactory());
+	    DirectoryReader reader = DirectoryReader.open(_localDir);
+	    IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
+	    writer.addIndexes(reader);
+	    writer.close();
+	    rm(_localPath);
+	  }
 
-  private void copyDir() throws IOException {
-    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
-    String[] fileNames = _localDir.listAll();
-    for (String fileName : fileNames) {
-      LOG.info("Copying [{0}] to [{1}]", fileName, _newIndex);
-      _localDir.copy(copyRateDirectory, fileName, fileName, IOContext.DEFAULT);
-    }
-    rm(_localPath);
-  }
+	  private void copyDir() throws IOException {
+	    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+	    String[] fileNames = _localDir.listAll();
+	    for (String fileName : fileNames) {
+	      LOG.info("Copying [{0}] to [{1}]", fileName, _newIndex);
+	      _localDir.copy(copyRateDirectory, fileName, fileName, IOContext.DEFAULT);
+	    }
+	    rm(_localPath);
+	  }
 
-  private void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
+	  private void rm(File file) {
+	    if (!file.exists()) {
+	      return;
+	    }
+	    if (file.isDirectory()) {
+	      for (File f : file.listFiles()) {
+	        rm(f);
+	      }
+	    }
+	    file.delete();
+	  }
 
-}
+	}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/fad57102/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
new file mode 100644
index 0000000..672a1c1
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+
+import org.apache.blur.utils.BlurConstants;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.FilterAtomicReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.ParallelAtomicReader;
+import org.apache.lucene.index.StoredFieldVisitor;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.Version;
+
+public class PrimeDocOverFlowHelper {
+
+  private static Directory _directory;
+
+  static {
+    try {
+      _directory = new RAMDirectory();
+      IndexWriter writer = new IndexWriter(_directory, new IndexWriterConfig(Version.LUCENE_43,
new KeywordAnalyzer()));
+      Document document = new Document();
+      document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
+      writer.addDocument(document);
+      writer.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static AtomicReader addPrimeDoc(AtomicReader atomicReader) throws IOException {
+    AtomicReaderContext context = DirectoryReader.open(_directory).leaves().get(0);
+    return new ParallelAtomicReader(true, setDocSize(context.reader(), atomicReader.maxDoc()),
atomicReader);
+  }
+
+  private static AtomicReader setDocSize(AtomicReader reader, final int count) {
+    return new FilterAtomicReader(reader) {
+      @Override
+      public Bits getLiveDocs() {
+        return new Bits() {
+          @Override
+          public boolean get(int index) {
+            return true;
+          }
+
+          @Override
+          public int length() {
+            return count;
+          }
+        };
+      }
+
+      @Override
+      public int numDocs() {
+        return count;
+      }
+
+      @Override
+      public int maxDoc() {
+        return count;
+      }
+
+      @Override
+      public void document(int docID, StoredFieldVisitor visitor) throws IOException {
+        // Do nothing
+      }
+    };
+  }
+}


Mime
View raw message