incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/4] git commit: BLUR-ID:77 Readonly version of BlurIndex
Date Tue, 30 Apr 2013 03:17:23 GMT
Updated Branches:
  refs/heads/0.1.5 53cf9b125 -> d37ded638


BLUR-ID:77 Readonly version of BlurIndex

Signed-off-by: Aaron McCurry <amccurry@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/1bcec181
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/1bcec181
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/1bcec181

Branch: refs/heads/0.1.5
Commit: 1bcec18136c13c5fde1d8276312fb7f63faedc0b
Parents: 53cf9b1
Author: Gagan <gagandeepjuneja@gmail.com>
Authored: Sun Apr 28 00:57:35 2013 +0530
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Apr 29 23:07:05 2013 -0400

----------------------------------------------------------------------
 .../blur/manager/writer/BlurIndexReader.java       |  144 ++++++++++----
 .../blur/manager/writer/BlurIndexReaderTest.java   |  143 ++++++++++++++
 2 files changed, 246 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1bcec181/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
index 28e393a..ffa64f0 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -19,55 +19,117 @@ package org.apache.blur.manager.writer;
 import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Row;
-import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 
-public class BlurIndexReader extends AbstractBlurIndex {
-
-  private static final Log LOG = LogFactory.getLog(BlurIndexReader.class);
-
-  public void init() throws IOException {
-    initIndexWriterConfig();
-    Directory directory = getDirectory();
-    if (!DirectoryReader.indexExists(directory)) {
-      IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer());
-      new IndexWriter(directory, conf).close();
-    }
-    initIndexReader(DirectoryReader.open(directory));
-  }
-
-  @Override
-  public synchronized void refresh() throws IOException {
-    // Override so that we can call within synchronized method
-    super.refresh();
-  }
-
-  @Override
-  public void close() throws IOException {
-    super.close();
-    LOG.info("Reader for table [{0}] shard [{1}] closed.", getTable(), getShard());
-  }
-
-  @Override
-  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException
{
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
-  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException
{
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
-  public void optimize(int numberOfSegmentsPerShard) throws IOException {
-    // Do nothing
-  }
+public class BlurIndexReader extends BlurIndex {
+
+	private static final Log LOG = LogFactory.getLog(BlurIndexReader.class);
+
+	private BlurIndexCloser _closer;
+	private Directory _directory;
+	private AtomicReference<DirectoryReader> _indexReaderRef = new AtomicReference<DirectoryReader>();
+	private AtomicBoolean _isClosed = new AtomicBoolean(false);
+	private AtomicBoolean _open = new AtomicBoolean();
+	private BlurIndexRefresher _refresher;
+	private final TableContext _tableContext;
+	private final ShardContext _shardContext;
+
+	public BlurIndexReader(ShardContext shardContext,
+			SharedMergeScheduler mergeScheduler, Directory directory,
+			BlurIndexRefresher refresher, BlurIndexCloser closer)
+			throws IOException {
+		_tableContext = shardContext.getTableContext();
+		_directory = directory;
+		_shardContext = shardContext;
+		_refresher = refresher;
+		_closer = closer;
+		IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION,
+				_tableContext.getAnalyzer());
+		conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
+		conf.setIndexDeletionPolicy(_tableContext.getIndexDeletionPolicy());
+		conf.setSimilarity(_tableContext.getSimilarity());
+		TieredMergePolicy mergePolicy = (TieredMergePolicy) conf
+				.getMergePolicy();
+		mergePolicy.setUseCompoundFile(false);
+
+		_open.set(true);
+
+		if (!DirectoryReader.indexExists(directory)) {
+			new IndexWriter(directory, conf).close();
+		}
+		_indexReaderRef.set(DirectoryReader.open(directory));
+		_refresher.register(this);
+	}
+
+	@Override
+	public void refresh() throws IOException {
+		if (!_open.get()) {
+			return;
+		}
+		DirectoryReader oldReader = _indexReaderRef.get();
+		DirectoryReader reader = DirectoryReader.openIfChanged(oldReader);
+		if (reader != null) {
+			_indexReaderRef.set(reader);
+			_closer.close(oldReader);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		_open.set(false);
+	    _refresher.unregister(this);
+	    _directory.close();
+	    _isClosed.set(true);
+		LOG.info("Reader for table [{0}] shard [{1}] closed.", _tableContext.getTable(),_shardContext.getShard());
+	}
+
+	@Override
+	public void replaceRow(boolean waitToBeVisible, boolean wal, Row row)
+			throws IOException {
+		throw new RuntimeException("Read-only shard");
+	}
+
+	@Override
+	public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId)
+			throws IOException {
+		throw new RuntimeException("Read-only shard");
+	}
+
+	@Override
+	public void optimize(int numberOfSegmentsPerShard) throws IOException {
+		// Do nothing
+	}
+
+	@Override
+	public IndexSearcherClosable getIndexReader() throws IOException {
+		throw new RuntimeException("not implemented");
+	}
+
+	@Override
+	public AtomicBoolean isClosed() {
+		 return _isClosed;
+	}
+	
+	public IndexSearcher getSearcher(){
+		IndexReader indexReader = _indexReaderRef.get();
+		indexReader.incRef();
+		return new IndexSearcher(indexReader);
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1bcec181/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
new file mode 100644
index 0000000..68bbd36
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
@@ -0,0 +1,143 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.FSDirectory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BlurIndexReaderTest {
+
+  private static final File TMPDIR = new File("./target/tmp");
+
+  private BlurNRTIndex writer;
+  private Random random = new Random();
+  private ExecutorService service;
+  private File base;
+  private Configuration configuration;
+
+  private DirectoryReferenceFileGC gc;
+  private IndexInputCloser closer;
+  private SharedMergeScheduler mergeScheduler;
+  private BlurIndexReader reader;
+
+  @Before
+  public void setup() throws IOException {
+    base = new File(TMPDIR, "blur-index-reader-test");
+    rm(base);
+    base.mkdirs();
+
+    mergeScheduler = new SharedMergeScheduler();
+    gc = new DirectoryReferenceFileGC();
+    gc.init();
+    closer = new IndexInputCloser();
+    closer.init();
+
+    configuration = new Configuration();
+    service = Executors.newThreadPool("test", 1);
+    
+  }
+
+  private void setupWriter(Configuration configuration, long refresh) throws IOException
{
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName("test-table");
+    tableDescriptor.setTableUri(new File(base, "table-store").toURI().toString());
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", Long.toString(refresh));
+    tableDescriptor.putToTableProperties("blur.shard.time.between.commits", Long.toString(1000));
+    
+    TableContext tableContext = TableContext.create(tableDescriptor);
+    FSDirectory directory = FSDirectory.open(new File(base, "index"));
+
+    ShardContext shardContext = ShardContext.create(tableContext, "test-shard");
+
+    writer = new BlurNRTIndex(shardContext, mergeScheduler, closer, directory, gc, service);
+    BlurIndexRefresher refresher = new BlurIndexRefresher();
+    BlurIndexCloser indexCloser = new BlurIndexCloser();
+    refresher.init();
+    indexCloser.init();
+    reader = new BlurIndexReader(shardContext, mergeScheduler, directory, refresher, indexCloser);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    writer.close();
+    mergeScheduler.close();
+    closer.close();
+    gc.close();
+    service.shutdownNow();
+    rm(base);
+  }
+
+  private void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Test
+  public void testBlurIndexWriter() throws IOException, InterruptedException {
+    setupWriter(configuration, 1);
+    IndexSearcher searcher = reader.getSearcher();
+    writer.replaceRow(true, true, genRow());
+    Thread.sleep(1500);
+    assertEquals(0,searcher.getIndexReader().numDocs());
+    reader.refresh();
+    assertEquals(1,reader.getSearcher().getIndexReader().numDocs());
+  }
+  
+  private Row genRow() {
+    Row row = new Row();
+    row.setId(Long.toString(random.nextLong()));
+    Record record = new Record();
+    record.setFamily("testing");
+    record.setRecordId(Long.toString(random.nextLong()));
+    for (int i = 0; i < 10; i++) {
+      record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
+    }
+    row.addToRecords(record);
+    return row;
+  }
+
+}


Mime
View raw message