incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixed BLUR-142
Date Wed, 19 Jun 2013 01:19:08 GMT
Updated Branches:
  refs/heads/master fd208badc -> c921b55f6


Fixed BLUR-142


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/c921b55f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/c921b55f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/c921b55f

Branch: refs/heads/master
Commit: c921b55f6d3e5ffd44338f77e9822be054ce76a2
Parents: fd208ba
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Jun 18 21:18:28 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Jun 18 21:18:28 2013 -0400

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   |  11 +-
 .../blur/manager/writer/BlurIndexReader.java    |   4 +-
 .../blur/manager/writer/BlurNRTIndex.java       |  20 +-
 .../manager/writer/TransactionRecorder.java     |   8 +-
 .../manager/writer/BlurIndexReaderTest.java     |   4 +-
 .../manager/writer/TransactionRecorderTest.java |   4 +-
 .../org/apache/blur/analysis/BlurAnalyzer.java  |  16 +-
 .../blur/lucene/search/SuperParserTest.java     |   7 +
 .../org/apache/blur/index/ExitableReader.java   | 298 +++++++++++++++++++
 .../java/org/apache/blur/index/IndexWriter.java |  99 ------
 .../apache/lucene/index/BlurIndexWriter.java    | 124 ++++++++
 11 files changed, 466 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index 618b4a2..e8bc5f4 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLongArray;
 
 import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.concurrent.Executors;
+import org.apache.blur.index.ExitableReader;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.search.FacetQuery;
@@ -1054,9 +1055,13 @@ public class IndexManager {
       BlurIndex index = entry.getValue();
       IndexSearcherClosable searcher = index.getIndexReader();
       try {
-        // @TODO need to add escapable rewriter
-        // IndexReader escapeReader = EscapeRewrite.wrap(reader, _running);
-        // IndexSearcher searcher = new IndexSearcher(escapeReader);
+        IndexReader indexReader = searcher.getIndexReader();
+        if (indexReader instanceof ExitableReader) {
+          ExitableReader er = (ExitableReader) indexReader;
+          er.setRunning(_running);
+        } else {
+          throw new IOException("IndexReader is not ExitableReader");
+        }
         if (_shardServerContext != null) {
           _shardServerContext.setIndexSearcherClosable(_table, shard, searcher);
         }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
index bc4c2ee..e2e3bf6 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.warmup.TraceableDirectory;
@@ -31,6 +30,7 @@ import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Row;
+import org.apache.lucene.index.BlurIndexWriter;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
@@ -64,7 +64,7 @@ public class BlurIndexReader extends BlurIndex {
       // if the directory is empty then create an empty index.
       IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _tableContext.getAnalyzer());
       conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
-      new IndexWriter(directory, conf).close();
+      new BlurIndexWriter(directory, conf).close();
     }
     _indexReaderRef.set(DirectoryReader.open(directory));
     _refresher.register(this);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index 62d2274..2b350fa 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -27,8 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.blur.index.IndexWriter;
-import org.apache.blur.index.IndexWriter.LockOwnerException;
+import org.apache.blur.index.ExitableReader;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
@@ -41,6 +40,8 @@ import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
+import org.apache.lucene.index.BlurIndexWriter;
+import org.apache.lucene.index.BlurIndexWriter.LockOwnerException;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriterConfig;
@@ -59,7 +60,7 @@ public class BlurNRTIndex extends BlurIndex {
 
   private final AtomicReference<NRTManager> _nrtManagerRef = new AtomicReference<NRTManager>();
   private final AtomicBoolean _isClosed = new AtomicBoolean();
-  private final IndexWriter _writer;
+  private final BlurIndexWriter _writer;
   private final Thread _committer;
   private final SearcherFactory _searcherFactory;
   private final Directory _directory;
@@ -93,7 +94,7 @@ public class BlurNRTIndex extends BlurIndex {
     DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(directory,
gc, closer);
     // This directory allows for warm up by adding tracing ability.
     TraceableDirectory dir = new TraceableDirectory(referenceCounter);
-    _writer = new IndexWriter(dir, conf);
+    _writer = new BlurIndexWriter(dir, conf, true);
     _recorder = new TransactionRecorder(shardContext);
     _recorder.replay(_writer);
 
@@ -155,7 +156,16 @@ public class BlurNRTIndex extends BlurIndex {
    */
   @Override
   public IndexSearcherClosable getIndexReader() throws IOException {
-    return (IndexSearcherClosable) getNRTManager().acquire();
+    return resetRunning((IndexSearcherClosable) getNRTManager().acquire());
+  }
+
+  private IndexSearcherClosable resetRunning(IndexSearcherClosable indexSearcherClosable)
{
+    IndexReader indexReader = indexSearcherClosable.getIndexReader();
+    if (indexReader instanceof ExitableReader) {
+      ExitableReader er = (ExitableReader) indexReader;
+      er.getRunning().set(true);
+    }
+    return indexSearcherClosable;
   }
 
   private NRTManager getNRTManager() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
index 6a5cf05..b7453ba 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
@@ -36,7 +36,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.blur.analysis.BlurAnalyzer;
-import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.server.ShardContext;
@@ -57,6 +56,7 @@ import org.apache.lucene.document.Field;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.FieldType;
 import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.BlurIndexWriter;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
@@ -137,7 +137,7 @@ public class TransactionRecorder extends TimerTask implements Closeable
{
     _lastSync.set(System.nanoTime());
   }
 
-  public void replay(IndexWriter writer) throws IOException {
+  public void replay(BlurIndexWriter writer) throws IOException {
     if (_fileSystem.exists(_walPath)) {
       FSDataInputStream inputStream = _fileSystem.open(_walPath);
       replay(writer, inputStream);
@@ -148,7 +148,7 @@ public class TransactionRecorder extends TimerTask implements Closeable
{
     }
   }
 
-  private void replay(IndexWriter writer, DataInputStream inputStream) throws CorruptIndexException,
IOException {
+  private void replay(BlurIndexWriter writer, DataInputStream inputStream) throws CorruptIndexException,
IOException {
     long updateCount = 0;
     long deleteCount = 0;
     byte[] buffer;
@@ -340,7 +340,7 @@ public class TransactionRecorder extends TimerTask implements Closeable
{
     return writer.deleteDocuments(createRowId(rowId));
   }
 
-  public void commit(IndexWriter writer) throws CorruptIndexException, IOException {
+  public void commit(BlurIndexWriter writer) throws CorruptIndexException, IOException {
     synchronized (_running) {
       long s = System.nanoTime();
       writer.commit();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
index 76886e4..42ffb99 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
@@ -25,7 +25,6 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.blur.concurrent.Executors;
-import org.apache.blur.index.IndexWriter;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
 import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
 import org.apache.blur.server.IndexSearcherClosable;
@@ -36,6 +35,7 @@ import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.BlurIndexWriter;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.FSDirectory;
@@ -135,7 +135,7 @@ public class BlurIndexReaderTest {
 
   private void doWrite() throws CorruptIndexException, LockObtainFailedException, IOException
{
     IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
-    IndexWriter writer = new IndexWriter(directory, conf);
+    BlurIndexWriter writer = new BlurIndexWriter(directory, conf);
     writer.addDocument(getDoc());
     writer.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
index b3eb93e..17870d1 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
@@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.MiniCluster;
 import org.apache.blur.analysis.BlurAnalyzer;
-import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.server.ShardContext;
@@ -46,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.index.BlurIndexWriter;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriterConfig;
@@ -113,7 +113,7 @@ public class TransactionRecorderTest {
 
     RAMDirectory directory = new RAMDirectory();
     IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
-    IndexWriter writer = new IndexWriter(directory, conf);
+    BlurIndexWriter writer = new BlurIndexWriter(directory, conf);
 
     TransactionRecorder replayTransactionRecorder = new TransactionRecorder(shardContext);
     closeThis.add(replayTransactionRecorder);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-query/src/main/java/org/apache/blur/analysis/BlurAnalyzer.java
----------------------------------------------------------------------
diff --git a/blur-query/src/main/java/org/apache/blur/analysis/BlurAnalyzer.java b/blur-query/src/main/java/org/apache/blur/analysis/BlurAnalyzer.java
index 48d0b90..cd7e233 100644
--- a/blur-query/src/main/java/org/apache/blur/analysis/BlurAnalyzer.java
+++ b/blur-query/src/main/java/org/apache/blur/analysis/BlurAnalyzer.java
@@ -29,7 +29,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.Reader;
 import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,13 +83,6 @@ public final class BlurAnalyzer extends AnalyzerWrapper {
 
   private static final String STANDARD = "org.apache.blur.analysis.NoStopWordStandardAnalyzer";
   public static final BlurAnalyzer BLANK_ANALYZER = new BlurAnalyzer(new KeywordAnalyzer());
-
-  private static final Analyzer ERROR_ANALYZER = new Analyzer() {
-    @Override
-    protected TokenStreamComponents createComponents(String field, Reader reader) {
-      throw new RuntimeException("This analyzer should never be used.");
-    }
-  };
   private static Map<String, Class<? extends Analyzer>> aliases = new HashMap<String,
Class<? extends Analyzer>>();
 
   private Set<String> _subIndexNames = new HashSet<String>();
@@ -123,10 +115,10 @@ public final class BlurAnalyzer extends AnalyzerWrapper {
     _defaultAnalyzer = getAnalyzerByClassName(defaultDefinition.getAnalyzerClassName(), aliases,
null, null,
         _fieldTypes);
     _analyzers = new HashMap<String, Analyzer>();
-    _analyzers.put(ROW_ID, ERROR_ANALYZER);
-    _analyzers.put(RECORD_ID, ERROR_ANALYZER);
-    _analyzers.put(PRIME_DOC, ERROR_ANALYZER);
-    _analyzers.put(FAMILY, ERROR_ANALYZER);
+    _analyzers.put(ROW_ID, new KeywordAnalyzer());
+    _analyzers.put(RECORD_ID, new KeywordAnalyzer());
+    _analyzers.put(PRIME_DOC, new KeywordAnalyzer());
+    _analyzers.put(FAMILY, new KeywordAnalyzer());
     _analyzers.put(SUPER, new WhitespaceAnalyzer(LUCENE_VERSION));
     load(_analyzers, _analyzerDefinition.columnFamilyDefinitions, _fullTextFields, _subIndexNameLookups,
         _subIndexNames, _fullTextColumnFamilies, _typeLookup, _fieldTypes);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-query/src/test/java/org/apache/blur/lucene/search/SuperParserTest.java
----------------------------------------------------------------------
diff --git a/blur-query/src/test/java/org/apache/blur/lucene/search/SuperParserTest.java b/blur-query/src/test/java/org/apache/blur/lucene/search/SuperParserTest.java
index 93a52bb..43c5bb9 100644
--- a/blur-query/src/test/java/org/apache/blur/lucene/search/SuperParserTest.java
+++ b/blur-query/src/test/java/org/apache/blur/lucene/search/SuperParserTest.java
@@ -276,6 +276,13 @@ public class SuperParserTest {
         bc(sq(tq("super", "word3"))), bc(sq(tq("super", "word2"))));
     assertQuery(q1, q);
   }
+  
+  @Test
+  public void test27() throws ParseException {
+    Query q = parseSq("rowid:1");
+    Query q1 = sq(tq("rowid", "1"));
+    assertQuery(q1, q);
+  }
 
   public static BooleanClause bc_m(Query q) {
     return new BooleanClause(q, Occur.MUST);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-store/src/main/java/org/apache/blur/index/ExitableReader.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/index/ExitableReader.java b/blur-store/src/main/java/org/apache/blur/index/ExitableReader.java
new file mode 100644
index 0000000..dd8bf36
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/index/ExitableReader.java
@@ -0,0 +1,298 @@
+package org.apache.blur.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.DocsAndPositionsEnum;
+import org.apache.lucene.index.DocsEnum;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.FilterAtomicReader;
+import org.apache.lucene.index.FilterDirectoryReader;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.automaton.CompiledAutomaton;
+
+/**
+ * The {@link DirectoryReader} wraps a real index {@link DirectoryReader} and
+ * allows for a {@link AtomicBoolean} to be checked periodically to see if the
+ * thread should exit or not. The exit mechanism is by throw a
+ * {@link ExitingReader} exception.
+ */
+public class ExitableReader extends FilterDirectoryReader {
+
+  @SuppressWarnings("serial")
+  public static class ExitingReader extends RuntimeException {
+
+  }
+
+  public static class ExitableSubReaderWrapper extends SubReaderWrapper {
+    private final AtomicReference<AtomicBoolean> _running;
+
+    public ExitableSubReaderWrapper(AtomicReference<AtomicBoolean> running) {
+      _running = running;
+    }
+
+    @Override
+    public AtomicReader wrap(AtomicReader reader) {
+      return new ExitableFilterAtomicReader(reader, _running);
+    }
+  }
+
+  public static class ExitableFilterAtomicReader extends FilterAtomicReader {
+
+    private final AtomicReference<AtomicBoolean> _running;
+
+    public ExitableFilterAtomicReader(AtomicReader in, AtomicReference<AtomicBoolean>
running) {
+      super(in);
+      _running = running;
+    }
+
+    @Override
+    public Fields fields() throws IOException {
+      Fields fields = super.fields();
+      if (fields == null) {
+        return null;
+      }
+      return new ExitableFields(fields, _running);
+    }
+
+  }
+
+  public static class ExitableFields extends Fields {
+
+    private final AtomicReference<AtomicBoolean> _running;
+    private final Fields _fields;
+
+    public ExitableFields(Fields fields, AtomicReference<AtomicBoolean> running) {
+      _fields = fields;
+      _running = running;
+    }
+
+    @Override
+    public Terms terms(String field) throws IOException {
+      Terms terms = _fields.terms(field);
+      if (terms == null) {
+        return null;
+      }
+      return new ExitableTerms(terms, _running);
+    }
+
+    @Override
+    public Iterator<String> iterator() {
+      return _fields.iterator();
+    }
+
+    @Override
+    public int size() {
+      return _fields.size();
+    }
+
+  }
+
+  public static class ExitableTerms extends Terms {
+
+    private final AtomicReference<AtomicBoolean> _running;
+    private final Terms _terms;
+
+    public ExitableTerms(Terms terms, AtomicReference<AtomicBoolean> running) {
+      _terms = terms;
+      _running = running;
+    }
+
+    @Override
+    public TermsEnum intersect(CompiledAutomaton compiled, BytesRef startTerm) throws IOException
{
+      return new ExitableTermsEnum(_terms.intersect(compiled, startTerm), _running);
+    }
+
+    @Override
+    public TermsEnum iterator(TermsEnum reuse) throws IOException {
+      return new ExitableTermsEnum(_terms.iterator(reuse), _running);
+    }
+
+    @Override
+    public Comparator<BytesRef> getComparator() {
+      return _terms.getComparator();
+    }
+
+    @Override
+    public long size() throws IOException {
+      return _terms.size();
+    }
+
+    @Override
+    public long getSumTotalTermFreq() throws IOException {
+      return _terms.getSumTotalTermFreq();
+    }
+
+    @Override
+    public long getSumDocFreq() throws IOException {
+      return _terms.getSumDocFreq();
+    }
+
+    @Override
+    public int getDocCount() throws IOException {
+      return _terms.getDocCount();
+    }
+
+    @Override
+    public boolean hasOffsets() {
+      return _terms.hasOffsets();
+    }
+
+    @Override
+    public boolean hasPositions() {
+      return _terms.hasPositions();
+    }
+
+    @Override
+    public boolean hasPayloads() {
+      return _terms.hasPayloads();
+    }
+
+  }
+
+  public static class ExitableTermsEnum extends TermsEnum {
+
+    private static final long CHECK_TIME = TimeUnit.SECONDS.toNanos(1);
+    private final AtomicBoolean _running;
+    private final TermsEnum _termsEnum;
+    private int max = 1000;
+    private long _lastCheck;
+    private int count = 0;
+
+    public ExitableTermsEnum(TermsEnum termsEnum, AtomicReference<AtomicBoolean> running)
{
+      _termsEnum = termsEnum;
+      _running = running.get();
+      _lastCheck = System.nanoTime();
+      checkAndThrow();
+    }
+
+    private void checkRunningState() {
+      count++;
+      if (count >= max) {
+        long now = System.nanoTime();
+        if (_lastCheck + CHECK_TIME < now) {
+
+          checkAndThrow();
+        } else {
+          // diff is the actual time between last check and now
+          long diff = now - _lastCheck;
+          // try to re-adjust max count
+          int maxShouldBe = (int) (((float) count / (float) diff) * (float) CHECK_TIME);
+          max = maxShouldBe;
+        }
+        count = 0;
+      }
+    }
+
+    private void checkAndThrow() {
+      if (!_running.get()) {
+        throw new ExitingReader();
+      }
+    }
+
+    @Override
+    public BytesRef next() throws IOException {
+      checkRunningState();
+      return _termsEnum.next();
+    }
+
+    @Override
+    public Comparator<BytesRef> getComparator() {
+      return _termsEnum.getComparator();
+    }
+
+    @Override
+    public SeekStatus seekCeil(BytesRef text, boolean useCache) throws IOException {
+      return _termsEnum.seekCeil(text, useCache);
+    }
+
+    @Override
+    public void seekExact(long ord) throws IOException {
+      _termsEnum.seekExact(ord);
+    }
+
+    @Override
+    public BytesRef term() throws IOException {
+      return _termsEnum.term();
+    }
+
+    @Override
+    public long ord() throws IOException {
+      return _termsEnum.ord();
+    }
+
+    @Override
+    public int docFreq() throws IOException {
+      return _termsEnum.docFreq();
+    }
+
+    @Override
+    public long totalTermFreq() throws IOException {
+      return _termsEnum.totalTermFreq();
+    }
+
+    @Override
+    public DocsEnum docs(Bits liveDocs, DocsEnum reuse, int flags) throws IOException {
+      checkRunningState();
+      return _termsEnum.docs(liveDocs, reuse, flags);
+    }
+
+    @Override
+    public DocsAndPositionsEnum docsAndPositions(Bits liveDocs, DocsAndPositionsEnum reuse,
int flags)
+        throws IOException {
+      checkRunningState();
+      return _termsEnum.docsAndPositions(liveDocs, reuse, flags);
+    }
+
+  }
+
+  private final AtomicReference<AtomicBoolean> _running;
+
+  public ExitableReader(DirectoryReader in) {
+    this(in, new AtomicReference<AtomicBoolean>(new AtomicBoolean(true)));
+  }
+
+  private ExitableReader(DirectoryReader in, AtomicReference<AtomicBoolean> running)
{
+    super(in, new ExitableSubReaderWrapper(running));
+    _running = running;
+  }
+
+  public AtomicBoolean getRunning() {
+    return _running.get();
+  }
+
+  public void setRunning(AtomicBoolean running) {
+    _running.set(running);
+  }
+
+  @Override
+  protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) {
+    return in;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java b/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java
deleted file mode 100644
index 95f8b05..0000000
--- a/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.blur.index;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.lang.reflect.Field;
-
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockObtainFailedException;
-
-public class IndexWriter extends org.apache.lucene.index.IndexWriter {
-
-  public static class LockOwnerException extends IOException {
-    private static final long serialVersionUID = -8211546713487754992L;
-
-    public LockOwnerException(String msg) {
-      super(msg);
-    }
-  }
-
-  private Lock internalLock;
-
-  public IndexWriter(Directory d, IndexWriterConfig conf) throws CorruptIndexException, LockObtainFailedException,
-      IOException {
-    super(d, conf);
-    try {
-      internalLock = getInternalLock();
-    } catch (Exception e) {
-      throw new RuntimeException("Could not get the write lock instance.", e);
-    }
-  }
-
-  private Lock getInternalLock() throws SecurityException, NoSuchFieldException, IllegalArgumentException,
-      IllegalAccessException {
-    Field field = org.apache.lucene.index.IndexWriter.class.getDeclaredField("writeLock");
-    field.setAccessible(true);
-    return (Lock) field.get(this);
-  }
-
-  @Override
-  protected void doAfterFlush() throws IOException {
-    super.doAfterFlush();
-    if (!internalLock.isLocked()) {
-      throw new LockOwnerException("Lock [" + internalLock + "] no longer has write lock.");
-    }
-  }
-
-  @Override
-  protected void doBeforeFlush() throws IOException {
-    super.doBeforeFlush();
-    if (!internalLock.isLocked()) {
-      throw new LockOwnerException("Lock [" + internalLock + "] no longer has write lock.");
-    }
-  }
-
-  /**
-   * This seems a little iffy, but basically only the writer instance itself can
-   * equal itself.
-   */
-  @Override
-  public boolean equals(Object obj) {
-    if (super.equals(obj)) {
-      return true;
-    } else if (obj == null) {
-      return false;
-    } else if (obj == this) {
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return super.hashCode();
-  }
-
-  @Override
-  public String toString() {
-    return "IndexWriter with directory [" + getDirectory() + "]";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c921b55f/blur-store/src/main/java/org/apache/lucene/index/BlurIndexWriter.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/lucene/index/BlurIndexWriter.java b/blur-store/src/main/java/org/apache/lucene/index/BlurIndexWriter.java
new file mode 100644
index 0000000..4de10c0
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/lucene/index/BlurIndexWriter.java
@@ -0,0 +1,124 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.blur.index.ExitableReader;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockObtainFailedException;
+
+public class BlurIndexWriter extends org.apache.lucene.index.IndexWriter {
+
+  public static class LockOwnerException extends IOException {
+    private static final long serialVersionUID = -8211546713487754992L;
+
+    public LockOwnerException(String msg) {
+      super(msg);
+    }
+  }
+
+  private final Lock internalLock;
+  private final boolean _makeReaderExitable;
+
+  public BlurIndexWriter(Directory d, IndexWriterConfig conf) throws CorruptIndexException,
LockObtainFailedException,
+      IOException {
+    this(d, conf, false);
+  }
+
+  public BlurIndexWriter(Directory d, IndexWriterConfig conf, boolean makeReaderExitable)
throws CorruptIndexException,
+      LockObtainFailedException, IOException {
+    super(d, conf);
+    try {
+      internalLock = getInternalLock();
+    } catch (Exception e) {
+      throw new RuntimeException("Could not get the write lock instance.", e);
+    }
+    _makeReaderExitable = makeReaderExitable;
+  }
+
+  @Override
+  DirectoryReader getReader() throws IOException {
+    return wrap(super.getReader());
+  }
+
+  @Override
+  DirectoryReader getReader(boolean applyAllDeletes) throws IOException {
+    return wrap(super.getReader(applyAllDeletes));
+  }
+
+  private DirectoryReader wrap(DirectoryReader reader) {
+    if (_makeReaderExitable) {
+      reader = new ExitableReader(reader);
+    }
+    return reader;
+  }
+
+  private Lock getInternalLock() throws SecurityException, NoSuchFieldException, IllegalArgumentException,
+      IllegalAccessException {
+    Field field = org.apache.lucene.index.IndexWriter.class.getDeclaredField("writeLock");
+    field.setAccessible(true);
+    return (Lock) field.get(this);
+  }
+
+  @Override
+  protected void doAfterFlush() throws IOException {
+    super.doAfterFlush();
+    if (!internalLock.isLocked()) {
+      throw new LockOwnerException("Lock [" + internalLock + "] no longer has write lock.");
+    }
+  }
+
+  @Override
+  protected void doBeforeFlush() throws IOException {
+    super.doBeforeFlush();
+    if (!internalLock.isLocked()) {
+      throw new LockOwnerException("Lock [" + internalLock + "] no longer has write lock.");
+    }
+  }
+
+  /**
+   * This seems a little iffy, but basically only the writer instance itself can
+   * equal itself.
+   */
+  @Override
+  public boolean equals(Object obj) {
+    if (super.equals(obj)) {
+      return true;
+    } else if (obj == null) {
+      return false;
+    } else if (obj == this) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "IndexWriter with directory [" + getDirectory() + "]";
+  }
+
+}


Mime
View raw message