incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/4] git commit: Making the BlurIndexSimpleWriter the default, still not great but should allow for large clusters to import index data from MR without crashing. The mixing of NRT and WAL with a lot of indexes online proves to be to much for typical Bl
Date Thu, 02 Jan 2014 15:45:40 GMT
Making the BlurIndexSimpleWriter the default, still not great but should allow for large clusters
to import index data from MR without crashing.  The mixing of NRT and WAL with a lot of indexes
online proves to be to much for typical Blur table to stay online.  To properly resolve will
likely need to create a new WAL implementation as well as a custom NRT system.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/f8ba394c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/f8ba394c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/f8ba394c

Branch: refs/heads/apache-blur-0.2
Commit: f8ba394c9dee5f1c721bc1d084a59257d7845369
Parents: e171c4e
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jan 2 09:17:10 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jan 2 09:17:10 2014 -0500

----------------------------------------------------------------------
 .../manager/writer/BlurIndexReaderWarmer.java   |   2 +-
 .../manager/writer/BlurIndexSimpleWriter.java   |  93 ++++++-
 .../org/apache/blur/server/TableContext.java    |   3 +-
 .../manager/writer/BlurIndexNRTSimpleTest.java  | 271 -------------------
 .../writer/BlurIndexSimpleWriterTest.java       | 271 +++++++++++++++++++
 5 files changed, 356 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f8ba394c/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReaderWarmer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReaderWarmer.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReaderWarmer.java
index aa3b8d2..f2a0881 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReaderWarmer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReaderWarmer.java
@@ -48,7 +48,7 @@ public class BlurIndexReaderWarmer extends IndexReaderWarmer {
 
   @Override
   public void warm(AtomicReader reader) throws IOException {
-    LOG.info("Warming reader [{0}]", reader);
+    LOG.debug("Warming reader [{0}]", reader);
     ReleaseReader releaseReader = new ReleaseReader() {
       @Override
       public void release() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f8ba394c/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 25b9117..e6cbba9 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -17,6 +17,8 @@
 package org.apache.blur.manager.writer;
 
 import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
 
 import java.io.IOException;
 import java.util.List;
@@ -64,14 +66,18 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private final IndexWriterConfig _conf;
   private final TableContext _tableContext;
   private final FieldManager _fieldManager;
-  private final BlurIndexRefresher _refresher;
   private final ShardContext _shardContext;
   private final AtomicReference<BlurIndexWriter> _writer = new AtomicReference<BlurIndexWriter>();
   private final boolean _makeReaderExitable = true;
   private IndexImporter _indexImporter;
   private final ReadWriteLock _lock = new ReentrantReadWriteLock();
   private final Lock _readLock = _lock.readLock();
+  private final Thread _refresherThread;
+  private final Thread _commitThread;
+  private final long _commitTime;
+  private final long _refreshTime;
 
+  private volatile boolean _dirty;
   private Thread _optimizeThread;
 
   public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
@@ -82,6 +88,8 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _shardContext = shardContext;
     _tableContext = _shardContext.getTableContext();
     _fieldManager = _tableContext.getFieldManager();
+    _commitTime = _tableContext.getBlurConfiguration().getLong(BLUR_SHARD_TIME_BETWEEN_COMMITS,
30000);
+    _refreshTime = _tableContext.getBlurConfiguration().getLong(BLUR_SHARD_TIME_BETWEEN_REFRESHS,
1000);
     Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
     _conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
     _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
@@ -100,15 +108,77 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     TraceableDirectory dir = new TraceableDirectory(referenceCounter);
     _directory = dir;
 
-    // _directory = directory;
-
     _indexCloser = indexCloser;
     _indexReader.set(wrap(DirectoryReader.open(_directory)));
-    _refresher = refresher;
 
     _writerOpener = getWriterOpener(shardContext);
     _writerOpener.start();
-    _refresher.register(this);
+    
+    _refresherThread = getRefresherThread();
+    _refresherThread.start();
+
+    _commitThread = getCommitThread();
+    _commitThread.start();
+  }
+
+  private Thread getCommitThread() {
+    String table = _tableContext.getTable();
+    String shard = _shardContext.getShard();
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        long pause = _commitTime;
+        while (!_isClosed.get()) {
+          synchronized (this) {
+            try {
+              wait(pause);
+            } catch (InterruptedException e) {
+              return;
+            }
+          }
+          try {
+            _writer.get().commit();
+            pause = _commitTime;
+          } catch (IOException e) {
+            LOG.error("Unknown error during commit/refresh.", e);
+            pause = Math.min(pause * 10, TimeUnit.MINUTES.toMillis(1));
+          }
+        }
+      }
+    });
+    thread.setName("Commiter for table [" + table + "] shard [" + shard + "]");
+    thread.setDaemon(true);
+    return thread;
+  }
+
+  private Thread getRefresherThread() {
+    String table = _tableContext.getTable();
+    String shard = _shardContext.getShard();
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        long pause = _refreshTime;
+        while (!_isClosed.get()) {
+          synchronized (this) {
+            try {
+              wait(pause);
+            } catch (InterruptedException e) {
+              return;
+            }
+          }
+          try {
+            waitToBeVisible(true);
+            pause = _refreshTime;
+          } catch (IOException e) {
+            LOG.error("Unknown error during refresh.", e);
+            pause = Math.min(pause * 10, TimeUnit.MINUTES.toMillis(1));
+          }
+        }
+      }
+    });
+    thread.setName("Refresh for table [" + table + "] shard [" + shard + "]");
+    thread.setDaemon(true);
+    return thread;
   }
 
   private DirectoryReader wrap(DirectoryReader reader) {
@@ -166,6 +236,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
       BlurIndexWriter writer = _writer.get();
       List<List<Field>> docs = TransactionRecorder.getDocs(row, _fieldManager);
       writer.updateDocuments(TransactionRecorder.createRowId(row.getId()), docs);
+      _dirty = true;
       waitToBeVisible(waitToBeVisible);
     } finally {
       _readLock.unlock();
@@ -179,6 +250,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
       waitUntilNotNull(_writer);
       BlurIndexWriter writer = _writer.get();
       writer.deleteDocuments(TransactionRecorder.createRowId(rowId));
+      _dirty = true;
       waitToBeVisible(waitToBeVisible);
     } finally {
       _readLock.unlock();
@@ -208,13 +280,14 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   }
 
   @Override
-  public void refresh() throws IOException {
-    DirectoryReader currentReader = _indexReader.get();
-    DirectoryReader newReader = DirectoryReader.openIfChanged(currentReader);
-    if (newReader != null) {
+  public synchronized void refresh() throws IOException {
+    if (_dirty) {
+      DirectoryReader currentReader = _indexReader.get();
+      DirectoryReader newReader = DirectoryReader.open(_writer.get(), true);
       LOG.debug("Refreshing index for table [{0}] shard [{1}].", _tableContext.getTable(),
_shardContext.getShard());
       _indexReader.set(wrap(newReader));
       _indexCloser.close(currentReader);
+      _dirty = false;
     }
   }
 
@@ -267,8 +340,6 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private void waitToBeVisible(boolean waitToBeVisible) throws IOException {
     if (waitToBeVisible) {
       waitUntilNotNull(_writer);
-      BlurIndexWriter writer = _writer.get();
-      writer.commit();
       refresh();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f8ba394c/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
index 769d642..a3647f5 100644
--- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -50,6 +50,7 @@ import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.manager.writer.BlurIndexCloser;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
+import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
 import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.thrift.generated.ScoreType;
@@ -330,7 +331,7 @@ public class TableContext {
       DirectoryReferenceFileGC gc, ExecutorService searchExecutor, BlurIndexCloser indexCloser,
       BlurIndexRefresher refresher, BlurIndexWarmup indexWarmup) throws IOException {
 
-    String className = _blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, BlurNRTIndex.class.getName());
+    String className = _blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, BlurIndexSimpleWriter.class.getName());
 
     Class<? extends BlurIndex> clazz;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f8ba394c/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
deleted file mode 100644
index 96178c4..0000000
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-package org.apache.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.concurrent.Executors;
-import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup;
-import org.apache.blur.server.IndexSearcherClosable;
-import org.apache.blur.server.ShardContext;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.trace.BaseTraceStorage;
-import org.apache.blur.trace.Trace;
-import org.apache.blur.trace.TraceCollector;
-import org.apache.blur.trace.TraceStorage;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.store.FSDirectory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BlurIndexNRTSimpleTest {
-
-  private static final int TEST_NUMBER_WAIT_VISIBLE = 500;
-  private static final int TEST_NUMBER = 50000;
-
-  private static final File TMPDIR = new File("./target/tmp");
-
-  private BlurIndexSimpleWriter writer;
-  private Random random = new Random();
-  private ExecutorService service;
-  private File base;
-  private Configuration configuration;
-
-  private DirectoryReferenceFileGC gc;
-  private SharedMergeScheduler mergeScheduler;
-  private String uuid;
-  private BlurIndexRefresher _refresher;
-  private BlurIndexCloser _closer;
-  private DefaultBlurIndexWarmup indexWarmup;
-
-  @Before
-  public void setup() throws IOException {
-    TableContext.clear();
-    base = new File(TMPDIR, "blur-index-writer-test");
-    rm(base);
-    base.mkdirs();
-
-    mergeScheduler = new SharedMergeScheduler(1);
-    gc = new DirectoryReferenceFileGC();
-
-    configuration = new Configuration();
-    service = Executors.newThreadPool("test", 10);
-    _refresher = new BlurIndexRefresher();
-    _closer = new BlurIndexCloser();
-    indexWarmup = new DefaultBlurIndexWarmup(1000000);
-  }
-
-  private void setupWriter(Configuration configuration, long refresh, boolean reload) throws
IOException {
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setName("test-table");
-    /*
-     * if reload is set to true...we create a new writer instance pointing to
-     * the same location as the old one..... so previous writer instances should
-     * be closed
-     */
-
-    if (!reload && uuid == null) {
-      uuid = UUID.randomUUID().toString();
-    }
-
-    tableDescriptor.setTableUri(new File(base, "table-store-" + uuid).toURI().toString());
-    tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", Long.toString(refresh));
-
-    TableContext tableContext = TableContext.create(tableDescriptor);
-    File path = new File(base, "index_" + uuid);
-    path.mkdirs();
-    FSDirectory directory = FSDirectory.open(path);
-    ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
-    writer = new BlurIndexSimpleWriter(shardContext, directory, mergeScheduler, gc, service,
_closer, _refresher,
-        indexWarmup);
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    _refresher.close();
-    writer.close();
-    mergeScheduler.close();
-    gc.close();
-    service.shutdownNow();
-    rm(base);
-  }
-
-  private void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Test
-  public void testBlurIndexWriter() throws IOException {
-    setupWriter(configuration, 5, false);
-    long s = System.nanoTime();
-    int total = 0;
-    TraceStorage oldStorage = Trace.getStorage();
-    Trace.setStorage(new BaseTraceStorage(new BlurConfiguration()) {
-      @Override
-      public void close() throws IOException {
-
-      }
-
-      @Override
-      public void store(TraceCollector collector) {
-        System.out.println(collector.toJson());
-      }
-    });
-    Trace.setupTrace("test");
-    for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
-      writer.replaceRow(true, true, genRow());
-      IndexSearcherClosable searcher = writer.getIndexSearcher();
-      IndexReader reader = searcher.getIndexReader();
-      assertEquals(i + 1, reader.numDocs());
-      searcher.close();
-      total++;
-    }
-    Trace.tearDownTrace();
-    long e = System.nanoTime();
-    double seconds = (e - s) / 1000000000.0;
-    double rate = total / seconds;
-    System.out.println("Rate " + rate);
-    IndexSearcherClosable searcher = writer.getIndexSearcher();
-    IndexReader reader = searcher.getIndexReader();
-    assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs());
-    searcher.close();
-    Trace.setStorage(oldStorage);
-  }
-
-  @Test
-  public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
-    setupWriter(configuration, 100, false);
-    IndexSearcherClosable searcher1 = writer.getIndexSearcher();
-    IndexReader reader1 = searcher1.getIndexReader();
-    assertEquals(0, reader1.numDocs());
-    searcher1.close();
-    long s = System.nanoTime();
-    int total = 0;
-    for (int i = 0; i < TEST_NUMBER; i++) {
-      if (i == TEST_NUMBER - 1) {
-        writer.replaceRow(true, true, genRow());
-      } else {
-        writer.replaceRow(false, true, genRow());
-      }
-      total++;
-    }
-    long e = System.nanoTime();
-    double seconds = (e - s) / 1000000000.0;
-    double rate = total / seconds;
-    System.out.println("Rate " + rate);
-    // //wait one second for the data to become visible the test is set to
-    // refresh once every 25 ms
-    // Thread.sleep(1000);
-    writer.refresh();
-    IndexSearcherClosable searcher2 = writer.getIndexSearcher();
-    IndexReader reader2 = searcher2.getIndexReader();
-    assertEquals(TEST_NUMBER, reader2.numDocs());
-    searcher2.close();
-  }
-
-  private Row genRow() {
-    Row row = new Row();
-    row.setId(Long.toString(random.nextLong()));
-    Record record = new Record();
-    record.setFamily("testing");
-    record.setRecordId(Long.toString(random.nextLong()));
-    for (int i = 0; i < 10; i++) {
-      record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
-    }
-    row.addToRecords(record);
-    return row;
-  }
-
-  // @Test
-  // public void testCreateSnapshot() throws IOException {
-  // setupWriter(configuration, 5, false);
-  // writer.createSnapshot("test_snapshot");
-  // assertTrue(writer.getSnapshots().contains("test_snapshot"));
-  //
-  // // check that the file is persisted
-  // Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
-  // FileSystem fileSystem = snapshotsDirPath.getFileSystem(new
-  // Configuration());
-  // Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot");
-  // assertTrue(fileSystem.exists(snapshotFilePath));
-  //
-  // // create a new writer instance and test whether the snapshots are loaded
-  // properly
-  // writer.close();
-  // setupWriter(configuration, 5, true);
-  // assertTrue(writer.getSnapshots().contains("test_snapshot"));
-  // }
-  //
-  //
-  // @Test
-  // public void testRemoveSnapshots() throws IOException {
-  // setupWriter(configuration, 5, false);
-  // Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
-  // FileSystem fileSystem = snapshotsDirPath.getFileSystem(new
-  // Configuration());
-  // fileSystem.mkdirs(snapshotsDirPath);
-  //
-  // // create 2 files in snapshots sub-dir
-  // Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1");
-  // Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2");
-  //
-  // BufferedWriter br1 = new BufferedWriter(new
-  // OutputStreamWriter(fileSystem.create(snapshotFile1, true)));
-  // br1.write("segments_1");
-  // br1.close();
-  //
-  // BufferedWriter br2 = new BufferedWriter(new
-  // OutputStreamWriter(fileSystem.create(snapshotFile2, true)));
-  // br2.write("segments_1");
-  // br2.close();
-  //
-  // // re-load the writer to load the snpshots
-  // writer.close();
-  // setupWriter(configuration, 5, true);
-  // assertEquals(writer.getSnapshots().size(), 2);
-  //
-  //
-  // writer.removeSnapshot("test_snapshot2");
-  // assertEquals(writer.getSnapshots().size(), 1);
-  // assertTrue(!writer.getSnapshots().contains("test_snapshot2"));
-  // assertTrue(!fileSystem.exists(snapshotFile2));
-  //
-  // }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f8ba394c/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
new file mode 100644
index 0000000..686f213
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
@@ -0,0 +1,271 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.trace.BaseTraceStorage;
+import org.apache.blur.trace.Trace;
+import org.apache.blur.trace.TraceCollector;
+import org.apache.blur.trace.TraceStorage;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.store.FSDirectory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BlurIndexSimpleWriterTest {
+
+  private static final int TEST_NUMBER_WAIT_VISIBLE = 500;
+  private static final int TEST_NUMBER = 50000;
+
+  private static final File TMPDIR = new File("./target/tmp");
+
+  private BlurIndexSimpleWriter writer;
+  private Random random = new Random();
+  private ExecutorService service;
+  private File base;
+  private Configuration configuration;
+
+  private DirectoryReferenceFileGC gc;
+  private SharedMergeScheduler mergeScheduler;
+  private String uuid;
+  private BlurIndexRefresher _refresher;
+  private BlurIndexCloser _closer;
+  private DefaultBlurIndexWarmup indexWarmup;
+
+  @Before
+  public void setup() throws IOException {
+    TableContext.clear();
+    base = new File(TMPDIR, "blur-index-writer-test");
+    rm(base);
+    base.mkdirs();
+
+    mergeScheduler = new SharedMergeScheduler(1);
+    gc = new DirectoryReferenceFileGC();
+
+    configuration = new Configuration();
+    service = Executors.newThreadPool("test", 10);
+    _refresher = new BlurIndexRefresher();
+    _closer = new BlurIndexCloser();
+    indexWarmup = new DefaultBlurIndexWarmup(1000000);
+  }
+
+  private void setupWriter(Configuration configuration, long refresh, boolean reload) throws
IOException {
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName("test-table");
+    /*
+     * if reload is set to true...we create a new writer instance pointing to
+     * the same location as the old one..... so previous writer instances should
+     * be closed
+     */
+
+    if (!reload && uuid == null) {
+      uuid = UUID.randomUUID().toString();
+    }
+
+    tableDescriptor.setTableUri(new File(base, "table-store-" + uuid).toURI().toString());
+    tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", Long.toString(refresh));
+
+    TableContext tableContext = TableContext.create(tableDescriptor);
+    File path = new File(base, "index_" + uuid);
+    path.mkdirs();
+    FSDirectory directory = FSDirectory.open(path);
+    ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
+    writer = new BlurIndexSimpleWriter(shardContext, directory, mergeScheduler, gc, service,
_closer, _refresher,
+        indexWarmup);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    _refresher.close();
+    writer.close();
+    mergeScheduler.close();
+    gc.close();
+    service.shutdownNow();
+    rm(base);
+  }
+
+  private void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Test
+  public void testBlurIndexWriter() throws IOException {
+    setupWriter(configuration, 5, false);
+    long s = System.nanoTime();
+    int total = 0;
+    TraceStorage oldStorage = Trace.getStorage();
+    Trace.setStorage(new BaseTraceStorage(new BlurConfiguration()) {
+      @Override
+      public void close() throws IOException {
+
+      }
+
+      @Override
+      public void store(TraceCollector collector) {
+        System.out.println(collector.toJson());
+      }
+    });
+    Trace.setupTrace("test");
+    for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
+      writer.replaceRow(true, true, genRow());
+      IndexSearcherClosable searcher = writer.getIndexSearcher();
+      IndexReader reader = searcher.getIndexReader();
+      assertEquals(i + 1, reader.numDocs());
+      searcher.close();
+      total++;
+    }
+    Trace.tearDownTrace();
+    long e = System.nanoTime();
+    double seconds = (e - s) / 1000000000.0;
+    double rate = total / seconds;
+    System.out.println("Rate " + rate);
+    IndexSearcherClosable searcher = writer.getIndexSearcher();
+    IndexReader reader = searcher.getIndexReader();
+    assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs());
+    searcher.close();
+    Trace.setStorage(oldStorage);
+  }
+
+  @Test
+  public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
+    setupWriter(configuration, 100, false);
+    IndexSearcherClosable searcher1 = writer.getIndexSearcher();
+    IndexReader reader1 = searcher1.getIndexReader();
+    assertEquals(0, reader1.numDocs());
+    searcher1.close();
+    long s = System.nanoTime();
+    int total = 0;
+    for (int i = 0; i < TEST_NUMBER; i++) {
+      if (i == TEST_NUMBER - 1) {
+        writer.replaceRow(true, true, genRow());
+      } else {
+        writer.replaceRow(false, true, genRow());
+      }
+      total++;
+    }
+    long e = System.nanoTime();
+    double seconds = (e - s) / 1000000000.0;
+    double rate = total / seconds;
+    System.out.println("Rate " + rate);
+    // //wait one second for the data to become visible the test is set to
+    // refresh once every 25 ms
+    Thread.sleep(1000);// Hack for now
+    writer.refresh();
+    IndexSearcherClosable searcher2 = writer.getIndexSearcher();
+    IndexReader reader2 = searcher2.getIndexReader();
+    assertEquals(TEST_NUMBER, reader2.numDocs());
+    searcher2.close();
+  }
+
+  private Row genRow() {
+    Row row = new Row();
+    row.setId(Long.toString(random.nextLong()));
+    Record record = new Record();
+    record.setFamily("testing");
+    record.setRecordId(Long.toString(random.nextLong()));
+    for (int i = 0; i < 10; i++) {
+      record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
+    }
+    row.addToRecords(record);
+    return row;
+  }
+
+  // @Test
+  // public void testCreateSnapshot() throws IOException {
+  // setupWriter(configuration, 5, false);
+  // writer.createSnapshot("test_snapshot");
+  // assertTrue(writer.getSnapshots().contains("test_snapshot"));
+  //
+  // // check that the file is persisted
+  // Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
+  // FileSystem fileSystem = snapshotsDirPath.getFileSystem(new
+  // Configuration());
+  // Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot");
+  // assertTrue(fileSystem.exists(snapshotFilePath));
+  //
+  // // create a new writer instance and test whether the snapshots are loaded
+  // properly
+  // writer.close();
+  // setupWriter(configuration, 5, true);
+  // assertTrue(writer.getSnapshots().contains("test_snapshot"));
+  // }
+  //
+  //
+  // @Test
+  // public void testRemoveSnapshots() throws IOException {
+  // setupWriter(configuration, 5, false);
+  // Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
+  // FileSystem fileSystem = snapshotsDirPath.getFileSystem(new
+  // Configuration());
+  // fileSystem.mkdirs(snapshotsDirPath);
+  //
+  // // create 2 files in snapshots sub-dir
+  // Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1");
+  // Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2");
+  //
+  // BufferedWriter br1 = new BufferedWriter(new
+  // OutputStreamWriter(fileSystem.create(snapshotFile1, true)));
+  // br1.write("segments_1");
+  // br1.close();
+  //
+  // BufferedWriter br2 = new BufferedWriter(new
+  // OutputStreamWriter(fileSystem.create(snapshotFile2, true)));
+  // br2.write("segments_1");
+  // br2.close();
+  //
+  // // re-load the writer to load the snpshots
+  // writer.close();
+  // setupWriter(configuration, 5, true);
+  // assertEquals(writer.getSnapshots().size(), 2);
+  //
+  //
+  // writer.removeSnapshot("test_snapshot2");
+  // assertEquals(writer.getSnapshots().size(), 1);
+  // assertTrue(!writer.getSnapshots().contains("test_snapshot2"));
+  // assertTrue(!fileSystem.exists(snapshotFile2));
+  //
+  // }
+}


Mime
View raw message