incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding the index importer to the BlueIndexSimpleWriter class.
Date Tue, 17 Dec 2013 20:24:46 GMT
Updated Branches:
  refs/heads/master 20bd0c7a8 -> abb56992e


Adding the index importer to the BlueIndexSimpleWriter class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/abb56992
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/abb56992
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/abb56992

Branch: refs/heads/master
Commit: abb56992e46bac019564b6a9fc2aa02a7aafea2d
Parents: 20bd0c7
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Dec 17 15:24:02 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Dec 17 15:24:39 2013 -0500

----------------------------------------------------------------------
 .../blur/manager/writer/BlurIndexNRTSimple.java | 237 -----------------
 .../manager/writer/BlurIndexSimpleWriter.java   | 253 +++++++++++++++++++
 .../blur/manager/writer/BlurNRTIndex.java       |   2 +-
 .../blur/manager/writer/IndexImporter.java      |  24 +-
 .../manager/writer/BlurIndexNRTSimpleTest.java  |  23 +-
 .../blur/manager/writer/IndexImporterTest.java  |   4 +-
 6 files changed, 287 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abb56992/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
deleted file mode 100644
index f35560a..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.writer;
-
-import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.blur.analysis.FieldManager;
-import org.apache.blur.index.ExitableReader;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.lucene.codec.Blur022Codec;
-import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
-import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.lucene.warmup.TraceableDirectory;
-import org.apache.blur.server.IndexSearcherClosable;
-import org.apache.blur.server.ShardContext;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.BlurIndexWriter;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.TieredMergePolicy;
-import org.apache.lucene.store.Directory;
-
-public class BlurIndexNRTSimple extends BlurIndex {
-
-  private static final Log LOG = LogFactory.getLog(BlurIndexNRTSimple.class);
-
-  private final AtomicBoolean _isClosed = new AtomicBoolean();
-  private final BlurIndexCloser _indexCloser;
-  private final AtomicReference<DirectoryReader> _indexReader = new AtomicReference<DirectoryReader>();
-  private final ExecutorService _searchThreadPool;
-  private final Directory _directory;
-  private final Thread _writerOpener;
-  private final IndexWriterConfig _conf;
-  private final TableContext _tableContext;
-  private final FieldManager _fieldManager;
-  private final BlurIndexRefresher _refresher;
-  private final ShardContext _shardContext;
-  private final AtomicReference<BlurIndexWriter> _writer = new AtomicReference<BlurIndexWriter>();
-  private final boolean _makeReaderExitable = true;
-
-  public BlurIndexNRTSimple(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
-      DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, BlurIndexCloser
indexCloser,
-      BlurIndexRefresher refresher) throws IOException {
-    super(shardContext, directory, mergeScheduler, gc, searchExecutor, indexCloser, refresher);
-    _searchThreadPool = searchExecutor;
-    _shardContext = shardContext;
-    _tableContext = _shardContext.getTableContext();
-    _fieldManager = _tableContext.getFieldManager();
-    Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
-    _conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
-    _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
-    _conf.setCodec(new Blur022Codec(_tableContext.getBlurConfiguration()));
-    _conf.setSimilarity(_tableContext.getSimilarity());
-    AtomicBoolean stop = new AtomicBoolean();
-    _conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext, stop, _isClosed));
-    TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
-    mergePolicy.setUseCompoundFile(false);
-    _conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
-
-    if (!DirectoryReader.indexExists(directory)) {
-      new BlurIndexWriter(directory, _conf).close();
-    }
-    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(directory,
gc);
-    // This directory allows for warm up by adding tracing ability.
-    TraceableDirectory dir = new TraceableDirectory(referenceCounter);
-    _directory = dir;
-
-    // _directory = directory;
-
-    _indexCloser = indexCloser;
-    _indexReader.set(wrap(DirectoryReader.open(_directory)));
-    _refresher = refresher;
-
-    _writerOpener = getWriterOpener(shardContext);
-    _writerOpener.start();
-    _refresher.register(this);
-  }
-
-  private DirectoryReader wrap(DirectoryReader reader) {
-    if (_makeReaderExitable) {
-      reader = new ExitableReader(reader);
-    }
-    return reader;
-  }
-
-  private Thread getWriterOpener(ShardContext shardContext) {
-    Thread thread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          _writer.set(new BlurIndexWriter(_directory, _conf.clone()));
-          synchronized (_writer) {
-            _writer.notify();
-          }
-        } catch (IOException e) {
-          LOG.error("Unknown error on index writer open.", e);
-        }
-      }
-    });
-    thread.setName("Writer Opener for Table [" + shardContext.getTableContext().getTable()
+ "] Shard ["
-        + shardContext.getShard() + "]");
-    thread.setDaemon(true);
-    return thread;
-  }
-
-  @Override
-  public IndexSearcherClosable getIndexSearcher() throws IOException {
-    final IndexReader indexReader = _indexReader.get();
-    while (!indexReader.tryIncRef()) {
-      // keep trying to increment the ref
-    }
-    return new IndexSearcherClosable(indexReader, _searchThreadPool) {
-      @Override
-      public Directory getDirectory() {
-        return _directory;
-      }
-
-      @Override
-      public void close() throws IOException {
-        indexReader.decRef();
-      }
-    };
-  }
-
-  @Override
-  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException
{
-    waitUntilNotNull(_writer);
-    BlurIndexWriter writer = _writer.get();
-    List<List<Field>> docs = TransactionRecorder.getDocs(row, _fieldManager);
-    writer.updateDocuments(TransactionRecorder.createRowId(row.getId()), docs);
-    waitToBeVisible(waitToBeVisible);
-  }
-
-  @Override
-  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException
{
-    waitUntilNotNull(_writer);
-    BlurIndexWriter writer = _writer.get();
-    writer.deleteDocuments(TransactionRecorder.createRowId(rowId));
-    waitToBeVisible(waitToBeVisible);
-  }
-
-  private void waitUntilNotNull(AtomicReference<?> ref) {
-    while (true) {
-      Object object = ref.get();
-      if (object != null) {
-        return;
-      }
-      synchronized (ref) {
-        try {
-          ref.wait(TimeUnit.SECONDS.toMillis(1));
-        } catch (InterruptedException e) {
-          return;
-        }
-      }
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    _isClosed.set(true);
-    IOUtils.cleanup(LOG, _writer.get());
-    IOUtils.cleanup(LOG, _indexReader.get());
-  }
-
-  @Override
-  public void refresh() throws IOException {
-    DirectoryReader currentReader = _indexReader.get();
-    DirectoryReader newReader = DirectoryReader.openIfChanged(currentReader);
-    if (newReader != null) {
-      LOG.info("Refreshing index for table [{0}] shard [{1}].", _tableContext.getTable(),
_shardContext.getShard());
-      _indexReader.set(wrap(newReader));
-      _indexCloser.close(currentReader);
-    }
-  }
-
-  @Override
-  public AtomicBoolean isClosed() {
-    return _isClosed;
-  }
-
-  @Override
-  public void optimize(int numberOfSegmentsPerShard) throws IOException {
-    throw new RuntimeException("not impl");
-  }
-
-  @Override
-  public void createSnapshot(String name) throws IOException {
-    throw new RuntimeException("not impl");
-  }
-
-  @Override
-  public void removeSnapshot(String name) throws IOException {
-    throw new RuntimeException("not impl");
-  }
-
-  @Override
-  public List<String> getSnapshots() throws IOException {
-    throw new RuntimeException("not impl");
-  }
-
-  private void waitToBeVisible(boolean waitToBeVisible) throws IOException {
-    if (waitToBeVisible) {
-      waitUntilNotNull(_writer);
-      BlurIndexWriter writer = _writer.get();
-      writer.commit();
-      refresh();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abb56992/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
new file mode 100644
index 0000000..e3b1058
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.blur.analysis.FieldManager;
+import org.apache.blur.index.ExitableReader;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.codec.Blur022Codec;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.lucene.warmup.TraceableDirectory;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.BlurIndexWriter;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.Directory;
+
+public class BlurIndexSimpleWriter extends BlurIndex {
+
+  private static final Log LOG = LogFactory.getLog(BlurIndexSimpleWriter.class);
+
+  private final AtomicBoolean _isClosed = new AtomicBoolean();
+  private final BlurIndexCloser _indexCloser;
+  private final AtomicReference<DirectoryReader> _indexReader = new AtomicReference<DirectoryReader>();
+  private final ExecutorService _searchThreadPool;
+  private final Directory _directory;
+  private final Thread _writerOpener;
+  private final IndexWriterConfig _conf;
+  private final TableContext _tableContext;
+  private final FieldManager _fieldManager;
+  private final BlurIndexRefresher _refresher;
+  private final ShardContext _shardContext;
+  private final AtomicReference<BlurIndexWriter> _writer = new AtomicReference<BlurIndexWriter>();
+  private final boolean _makeReaderExitable = true;
+  private IndexImporter _indexImporter;
+  private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  private final Lock _readLock = _lock.readLock();
+
+  public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
+      DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, BlurIndexCloser
indexCloser,
+      BlurIndexRefresher refresher) throws IOException {
+    super(shardContext, directory, mergeScheduler, gc, searchExecutor, indexCloser, refresher);
+    _searchThreadPool = searchExecutor;
+    _shardContext = shardContext;
+    _tableContext = _shardContext.getTableContext();
+    _fieldManager = _tableContext.getFieldManager();
+    Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
+    _conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
+    _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
+    _conf.setCodec(new Blur022Codec(_tableContext.getBlurConfiguration()));
+    _conf.setSimilarity(_tableContext.getSimilarity());
+    AtomicBoolean stop = new AtomicBoolean();
+    _conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext, stop, _isClosed));
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+    _conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
+
+    if (!DirectoryReader.indexExists(directory)) {
+      new BlurIndexWriter(directory, _conf).close();
+    }
+    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(directory,
gc);
+    // This directory allows for warm up by adding tracing ability.
+    TraceableDirectory dir = new TraceableDirectory(referenceCounter);
+    _directory = dir;
+
+    // _directory = directory;
+
+    _indexCloser = indexCloser;
+    _indexReader.set(wrap(DirectoryReader.open(_directory)));
+    _refresher = refresher;
+
+    _writerOpener = getWriterOpener(shardContext);
+    _writerOpener.start();
+    _refresher.register(this);
+  }
+
+  private DirectoryReader wrap(DirectoryReader reader) {
+    if (_makeReaderExitable) {
+      reader = new ExitableReader(reader);
+    }
+    return reader;
+  }
+
+  private Thread getWriterOpener(ShardContext shardContext) {
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          _writer.set(new BlurIndexWriter(_directory, _conf.clone()));
+          synchronized (_writer) {
+            _writer.notify();
+          }
+          _indexImporter = new IndexImporter(_writer.get(), _lock, _shardContext, TimeUnit.SECONDS,
10);
+        } catch (IOException e) {
+          LOG.error("Unknown error on index writer open.", e);
+        }
+      }
+    });
+    thread.setName("Writer Opener for Table [" + shardContext.getTableContext().getTable()
+ "] Shard ["
+        + shardContext.getShard() + "]");
+    thread.setDaemon(true);
+    return thread;
+  }
+
+  @Override
+  public IndexSearcherClosable getIndexSearcher() throws IOException {
+    final IndexReader indexReader = _indexReader.get();
+    while (!indexReader.tryIncRef()) {
+      // keep trying to increment the ref
+    }
+    return new IndexSearcherClosable(indexReader, _searchThreadPool) {
+      @Override
+      public Directory getDirectory() {
+        return _directory;
+      }
+
+      @Override
+      public void close() throws IOException {
+        indexReader.decRef();
+      }
+    };
+  }
+
+  @Override
+  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException
{
+    _readLock.lock();
+    try {
+      waitUntilNotNull(_writer);
+      BlurIndexWriter writer = _writer.get();
+      List<List<Field>> docs = TransactionRecorder.getDocs(row, _fieldManager);
+      writer.updateDocuments(TransactionRecorder.createRowId(row.getId()), docs);
+      waitToBeVisible(waitToBeVisible);
+    } finally {
+      _readLock.unlock();
+    }
+  }
+
+  @Override
+  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException
{
+    _readLock.lock();
+    try {
+      waitUntilNotNull(_writer);
+      BlurIndexWriter writer = _writer.get();
+      writer.deleteDocuments(TransactionRecorder.createRowId(rowId));
+      waitToBeVisible(waitToBeVisible);
+    } finally {
+      _readLock.unlock();
+    }
+  }
+
+  private void waitUntilNotNull(AtomicReference<?> ref) {
+    while (true) {
+      Object object = ref.get();
+      if (object != null) {
+        return;
+      }
+      synchronized (ref) {
+        try {
+          ref.wait(TimeUnit.SECONDS.toMillis(1));
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    _isClosed.set(true);
+    IOUtils.cleanup(LOG, _indexImporter, _writer.get(), _indexReader.get());
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    DirectoryReader currentReader = _indexReader.get();
+    DirectoryReader newReader = DirectoryReader.openIfChanged(currentReader);
+    if (newReader != null) {
+      LOG.debug("Refreshing index for table [{0}] shard [{1}].", _tableContext.getTable(),
_shardContext.getShard());
+      _indexReader.set(wrap(newReader));
+      _indexCloser.close(currentReader);
+    }
+  }
+
+  @Override
+  public AtomicBoolean isClosed() {
+    return _isClosed;
+  }
+
+  @Override
+  public void optimize(int numberOfSegmentsPerShard) throws IOException {
+    throw new RuntimeException("not impl");
+  }
+
+  @Override
+  public void createSnapshot(String name) throws IOException {
+    throw new RuntimeException("not impl");
+  }
+
+  @Override
+  public void removeSnapshot(String name) throws IOException {
+    throw new RuntimeException("not impl");
+  }
+
+  @Override
+  public List<String> getSnapshots() throws IOException {
+    throw new RuntimeException("not impl");
+  }
+
+  private void waitToBeVisible(boolean waitToBeVisible) throws IOException {
+    if (waitToBeVisible) {
+      waitUntilNotNull(_writer);
+      BlurIndexWriter writer = _writer.get();
+      writer.commit();
+      refresh();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abb56992/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index 2b9d38e..ad50599 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -147,7 +147,7 @@ public class BlurNRTIndex extends BlurIndex {
     };
 
     _trackingWriter = new TrackingIndexWriter(_writer);
-    _indexImporter = new IndexImporter(_trackingWriter, _lock, _shardContext, TimeUnit.SECONDS,
10);
+    _indexImporter = new IndexImporter(_trackingWriter.getIndexWriter(), _lock, _shardContext,
TimeUnit.SECONDS, 10);
     _nrtManagerRef.set(new NRTManager(_trackingWriter, _searcherFactory, APPLY_ALL_DELETES));
     // start commiter
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abb56992/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
index 1d66f3a..30adf17 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -50,21 +50,20 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.TermsEnum;
-import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
 
 public class IndexImporter extends TimerTask implements Closeable {
   private final static Log LOG = LogFactory.getLog(IndexImporter.class);
 
-  private final TrackingIndexWriter _trackingWriter;
+  private final IndexWriter _indexWriter;
   private final ReadWriteLock _lock;
   private final ShardContext _shardContext;
   private final Timer _timer;
 
-  public IndexImporter(TrackingIndexWriter trackingWriter, ReadWriteLock lock, ShardContext
shardContext,
+  public IndexImporter(IndexWriter indexWriter, ReadWriteLock lock, ShardContext shardContext,
       TimeUnit refreshUnit, long refreshAmount) {
-    _trackingWriter = trackingWriter;
+    _indexWriter = indexWriter;
     _lock = lock;
     _shardContext = shardContext;
     _timer = new Timer("IndexImporter [" + shardContext.getShard() + "/" + shardContext.getTableContext().getTable()
@@ -130,15 +129,14 @@ public class IndexImporter extends TimerTask implements Closeable {
       LOG.info("Obtaining lock on [{0}/{1}]", shard, table);
       _lock.writeLock().lock();
       try {
-        IndexWriter indexWriter = _trackingWriter.getIndexWriter();
         for (HdfsDirectory directory : indexesToImport) {
           LOG.info("Starting import [{0}], commiting on [{1}/{2}]", directory, shard, table);
-          indexWriter.commit();
+          _indexWriter.commit();
           boolean isSuccess = true;
           boolean isRollbackDueToException = false;
-          boolean emitDeletes = indexWriter.numDocs() != 0;
+          boolean emitDeletes = _indexWriter.numDocs() != 0;
           try {
-            isSuccess = applyDeletes(directory, indexWriter, shard, emitDeletes);
+            isSuccess = applyDeletes(directory, _indexWriter, shard, emitDeletes);
           } catch (IOException e) {
             LOG.error("Some issue with deleting the old index on [{0}/{1}]", e, shard, table);
             isSuccess = false;
@@ -147,12 +145,12 @@ public class IndexImporter extends TimerTask implements Closeable {
           Path dirPath = directory.getPath();
           if (isSuccess) {
             LOG.info("Add index [{0}] [{1}/{2}]", directory, shard, table);
-            indexWriter.addIndexes(directory);
+            _indexWriter.addIndexes(directory);
             LOG.info("Removing delete markers [{0}] on [{1}/{2}]", directory, shard, table);
-            indexWriter.deleteDocuments(new Term(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE));
+            _indexWriter.deleteDocuments(new Term(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE));
             LOG.info("Finishing import [{0}], commiting on [{1}/{2}]", directory, shard,
table);
-            indexWriter.commit();
-            indexWriter.maybeMerge();
+            _indexWriter.commit();
+            _indexWriter.maybeMerge();
             LOG.info("Cleaning up old directory [{0}] for [{1}/{2}]", dirPath, shard, table);
             fileSystem.delete(dirPath, true);
             LOG.info("Import complete on [{0}/{1}]", shard, table);
@@ -163,7 +161,7 @@ public class IndexImporter extends TimerTask implements Closeable {
                   shard, table, directory);
             }
             LOG.info("Starting rollback on [{0}/{1}]", shard, table);
-            indexWriter.rollback();
+            _indexWriter.rollback();
             LOG.info("Finished rollback on [{0}/{1}]", shard, table);
             String name = dirPath.getName();
             int lastIndexOf = name.lastIndexOf('.');

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abb56992/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
index cd528cb..faf83eb 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.blur.BlurConfiguration;
 import org.apache.blur.concurrent.Executors;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
 import org.apache.blur.server.IndexSearcherClosable;
@@ -34,6 +35,10 @@ import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.trace.BaseTraceStorage;
+import org.apache.blur.trace.Trace;
+import org.apache.blur.trace.TraceCollector;
+import org.apache.blur.trace.TraceStorage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.store.FSDirectory;
@@ -48,7 +53,7 @@ public class BlurIndexNRTSimpleTest {
 
   private static final File TMPDIR = new File("./target/tmp");
 
-  private BlurIndexNRTSimple writer;
+  private BlurIndexSimpleWriter writer;
   private Random random = new Random();
   private ExecutorService service;
   private File base;
@@ -97,7 +102,7 @@ public class BlurIndexNRTSimpleTest {
     path.mkdirs();
     FSDirectory directory = FSDirectory.open(path);
     ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
-    writer = new BlurIndexNRTSimple(shardContext, directory, mergeScheduler, gc, service,
_closer, _refresher);
+    writer = new BlurIndexSimpleWriter(shardContext, directory, mergeScheduler, gc, service,
_closer, _refresher);
   }
 
   @After
@@ -127,6 +132,18 @@ public class BlurIndexNRTSimpleTest {
     setupWriter(configuration, 5, false);
     long s = System.nanoTime();
     int total = 0;
+    TraceStorage oldStorage = Trace.getStorage();
+    Trace.setStorage(new BaseTraceStorage(new BlurConfiguration()) {
+      @Override
+      public void close() throws IOException {
+        
+      }
+      @Override
+      public void store(TraceCollector collector) {
+        System.out.println(collector.toJson());
+      }
+    });
+    Trace.setupTrace("test");
     for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
       writer.replaceRow(true, true, genRow());
       IndexSearcherClosable searcher = writer.getIndexSearcher();
@@ -135,6 +152,7 @@ public class BlurIndexNRTSimpleTest {
       searcher.close();
       total++;
     }
+    Trace.tearDownTrace();
     long e = System.nanoTime();
     double seconds = (e - s) / 1000000000.0;
     double rate = total / seconds;
@@ -143,6 +161,7 @@ public class BlurIndexNRTSimpleTest {
     IndexReader reader = searcher.getIndexReader();
     assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs());
     searcher.close();
+    Trace.setStorage(oldStorage);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abb56992/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index ac09d89..c5cf3a8 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -43,7 +43,6 @@ import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
 import org.apache.lucene.store.Directory;
 import org.junit.After;
 import org.junit.Before;
@@ -102,8 +101,7 @@ public class IndexImporterTest {
     mainWriter = new IndexWriter(mainDirectory, conf);
     BufferStore.initNewBuffer(128, 128 * 128);
 
-    indexImporter = new IndexImporter(new TrackingIndexWriter(mainWriter), new ReentrantReadWriteLock(),
shardContext,
-        TimeUnit.MINUTES, 10);
+    indexImporter = new IndexImporter(mainWriter, new ReentrantReadWriteLock(), shardContext,
TimeUnit.MINUTES, 10);
   }
 
   @After


Mime
View raw message