incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Lowered some logging levels and fixed some unit tests.
Date Tue, 04 Jun 2013 15:16:49 GMT
Updated Branches:
  refs/heads/0.1.5 a3e4f3c13 -> bf1258f8c


Lowered some logging levels and fixed some unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/bf1258f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/bf1258f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/bf1258f8

Branch: refs/heads/0.1.5
Commit: bf1258f8cab92ffdb2486ac421d8a7ec09a8e624
Parents: a3e4f3c
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Jun 4 11:16:11 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Jun 4 11:16:11 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/manager/writer/IndexImporter.java  |   32 +++--
 .../blur/manager/writer/SharedMergeScheduler.java  |    4 +-
 .../blur/manager/writer/TransactionRecorder.java   |    6 +-
 .../blur/manager/writer/BlurIndexReaderTest.java   |    1 +
 .../blur/manager/writer/BlurNRTIndexTest.java      |    1 +
 .../blur/manager/writer/IndexImporterTest.java     |  112 +++++++++------
 .../manager/writer/TransactionRecorderTest.java    |    7 +-
 .../lucene/store/refcounter/IndexInputCloser.java  |    2 +-
 8 files changed, 97 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bf1258f8/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
index 74cfabc..9470d40 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -97,14 +97,13 @@ public class IndexImporter extends TimerTask implements Closeable {
           indexWriter.commit();
           boolean isSuccess = true;
           boolean isRollbackDueToException = false;
-          if (indexWriter.numDocs() != 0) {
-            try {
-              isSuccess = applyDeletes(directory, indexWriter, shard);
-            } catch (IOException e) {
-              LOG.error("Some issue with deleting the old index on [{0}/{1}]", e, shard,
table);
-              isSuccess = false;
-              isRollbackDueToException = true;
-            }
+          boolean emitDeletes = indexWriter.numDocs() != 0;
+          try {
+            isSuccess = applyDeletes(directory, indexWriter, shard, emitDeletes);
+          } catch (IOException e) {
+            LOG.error("Some issue with deleting the old index on [{0}/{1}]", e, shard, table);
+            isSuccess = false;
+            isRollbackDueToException = true;
           }
           Path dirPath = directory.getPath();
           if (isSuccess) {
@@ -116,8 +115,10 @@ public class IndexImporter extends TimerTask implements Closeable {
             fileSystem.delete(dirPath, true);
             LOG.info("Import complete on [{0}/{1}]", shard, table);
           } else {
-            if(!isRollbackDueToException){
-              LOG.error("Index is corrupted, RowIds are found in wrong shard [{0}/{1}], cancelling
index import for [{2}]", shard, table, directory);
+            if (!isRollbackDueToException) {
+              LOG.error(
+                  "Index is corrupted, RowIds are found in wrong shard [{0}/{1}], cancelling
index import for [{2}]",
+                  shard, table, directory);
             }
             LOG.info("Starting rollback on [{0}/{1}]", shard, table);
             indexWriter.rollback();
@@ -145,7 +146,8 @@ public class IndexImporter extends TimerTask implements Closeable {
     return result;
   }
 
-  private boolean applyDeletes(Directory directory, IndexWriter indexWriter, String shard)
throws IOException {
+  private boolean applyDeletes(Directory directory, IndexWriter indexWriter, String shard,
boolean emitDeletes)
+      throws IOException {
     DirectoryReader reader = DirectoryReader.open(directory);
     try {
       LOG.info("Applying deletes in reader [{0}]", reader);
@@ -165,11 +167,13 @@ public class IndexImporter extends TimerTask implements Closeable {
           key.set(rowIdInBytes, 0, rowIdInBytes.length);
           int partition = blurPartitioner.getPartition(key, null, numberOfShards);
           int shardId = BlurUtil.getShardIndex(shard);
-          if( shardId != partition){
+          if (shardId != partition) {
             return false;
           }
-          Term term = new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref));
-          indexWriter.deleteDocuments(term);
+          if (emitDeletes) {
+            Term term = new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref));
+            indexWriter.deleteDocuments(term);
+          }
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bf1258f8/src/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
index b0e2b38..f5668e8 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
@@ -64,7 +64,7 @@ public class SharedMergeScheduler extends MergeScheduler implements Runnable
{
           merge(writer);
         }
       } catch (InterruptedException e) {
-        LOG.info("Merging interrupted, exiting.");
+        LOG.debug("Merging interrupted, exiting.");
         return;
       } catch (IOException e) {
         LOG.error("Unknown IOException", e);
@@ -83,7 +83,7 @@ public class SharedMergeScheduler extends MergeScheduler implements Runnable
{
     long e = System.currentTimeMillis();
     double time = (e - s) / 1000.0;
     double rate = (merge.totalBytesSize() / 1024 / 1024) / time;
-    LOG.info("Merge took [{0} s] to complete at rate of [{1} MB/s]", time, rate);
+    LOG.debug("Merge took [{0} s] to complete at rate of [{1} MB/s]", time, rate);
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bf1258f8/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
index cd05ccf..febc7ef 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
@@ -189,7 +189,7 @@ public class TransactionRecorder extends TimerTask implements Closeable
{
   }
 
   private void rollLog() throws IOException {
-    LOG.info("Rolling WAL path [" + _walPath + "]");
+    LOG.debug("Rolling WAL path [" + _walPath + "]");
     FSDataOutputStream os = _outputStream.get();
     if (os != null) {
       os.close();
@@ -343,10 +343,10 @@ public class TransactionRecorder extends TimerTask implements Closeable
{
       long s = System.nanoTime();
       writer.commit();
       long m = System.nanoTime();
-      LOG.info("Commit took [{0} ms] for [{1}/{2}]", (m - s) / 1000000.0, _table, _shard);
+      LOG.debug("Commit took [{0} ms] for [{1}/{2}]", (m - s) / 1000000.0, _table, _shard);
       rollLog();
       long e = System.nanoTime();
-      LOG.info("Log roller took [{0} ms] for [{1}/{2}]", (e - m) / 1000000.0, _table, _shard);
+      LOG.debug("Log roller took [{0} ms] for [{1}/{2}]", (e - m) / 1000000.0, _table, _shard);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bf1258f8/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
index f74bb1b..f225cfc 100644
--- a/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
@@ -58,6 +58,7 @@ public class BlurIndexReaderTest {
 
   @Before
   public void setup() throws IOException {
+    TableContext.clear();
     base = new File(TMPDIR, "blur-index-reader-test");
     rm(base);
     base.mkdirs();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bf1258f8/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
index ef07fa8..c207937 100644
--- a/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
@@ -62,6 +62,7 @@ public class BlurNRTIndexTest {
 
   @Before
   public void setup() throws IOException {
+    TableContext.clear();
     base = new File(TMPDIR, "blur-index-writer-test");
     rm(base);
     base.mkdirs();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bf1258f8/src/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
b/src/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index e9655ad..5d3466e 100644
--- a/src/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -21,7 +21,6 @@ import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Random;
 import java.util.UUID;
@@ -29,106 +28,125 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.blur.analysis.BlurAnalyzer;
-import org.apache.blur.index.IndexWriter;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.AnalyzerDefinition;
 import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
-import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.Directory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 public class IndexImporterTest {
 
-  private static final File TMPDIR = new File("./target/tmp");
-  
-  private File base;
+  private static final Path TMPDIR = new Path("target/tmp");
+
+  private Path base;
   private Configuration configuration;
-  private IndexWriter writer;
+  private IndexWriter commitWriter;
   private IndexImporter indexImporter;
   private Random random = new Random();
-  private File path;
-  private File badRowIdsPath;
-  
+  private Path path;
+  private Path badRowIdsPath;
+  private IndexWriter mainWriter;
+  private FileSystem fileSystem;
+
   @Before
   public void setup() throws IOException {
-    base = new File(TMPDIR, "blur-index-importer-test");
-    rm(base);
-    base.mkdirs();
+    TableContext.clear();
     configuration = new Configuration();
+    base = new Path(TMPDIR, "blur-index-importer-test");
+    fileSystem = base.getFileSystem(configuration);
+    fileSystem.delete(base, true);
+    fileSystem.mkdirs(base);
+    setupWriter(configuration);
   }
 
   private void setupWriter(Configuration configuration) throws IOException {
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setName("test-table");
     String uuid = UUID.randomUUID().toString();
-    tableDescriptor.setTableUri(new File(base, "table-store").toURI().toString());
+    
+    tableDescriptor.setTableUri(new Path(base, "table-table").toUri().toString());
     tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
     tableDescriptor.setShardCount(2);
+    
     TableContext tableContext = TableContext.create(tableDescriptor);
     ShardContext shardContext = ShardContext.create(tableContext, "shard-00000000");
-    File tablePath = new File(base, "table-store");
-    File shardPath = new File(tablePath, "shard-00000000");
+    Path tablePath = new Path(base, "table-table");
+    Path shardPath = new Path(tablePath, "shard-00000000");
     String indexDirName = "index_" + uuid;
-    path = new File(shardPath, indexDirName +".commit");
-    path.mkdirs();
-    badRowIdsPath = new File(shardPath, indexDirName +".bad_rowids");
-    FSDirectory directory = FSDirectory.open(path);
+    path = new Path(shardPath, indexDirName + ".commit");
+    fileSystem.mkdirs(path);
+    badRowIdsPath = new Path(shardPath, indexDirName + ".bad_rowids");
+    Directory commitDirectory = new HdfsDirectory(configuration, path);
+    Directory mainDirectory = new HdfsDirectory(configuration, shardPath);
     IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, tableContext.getAnalyzer());
-    writer = new IndexWriter(directory, conf);
+    commitWriter = new IndexWriter(commitDirectory, conf);
+    
+    mainWriter = new IndexWriter(mainDirectory, conf);
     BufferStore.init(128, 128);
-    indexImporter = new IndexImporter(new TrackingIndexWriter(writer), new ReentrantReadWriteLock(),
shardContext, TimeUnit.MINUTES, 10);
+    
+    indexImporter = new IndexImporter(new TrackingIndexWriter(mainWriter), new ReentrantReadWriteLock(),
shardContext,
+        TimeUnit.MINUTES, 10);
   }
 
   @After
   public void tearDown() throws IOException {
-    writer.close();
+    mainWriter.close();
     indexImporter.close();
-    rm(base);
+    base.getFileSystem(configuration).delete(base, true);
   }
 
-  private void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-  
+
   @Test
   public void testIndexImporterWithCorrectRowIdShardCombination() throws IOException {
-    setupWriter(configuration);
+    
     Document document = TransactionRecorder.convert("1", genRecord("1"), new StringBuilder(),
new BlurAnalyzer());
-    writer.addDocument(document);
-    writer.commit();
+    commitWriter.addDocument(document);
+    commitWriter.commit();
+    commitWriter.close();
     indexImporter.run();
-    assertFalse(path.exists());
-    assertFalse(badRowIdsPath.exists());
+    assertFalse(fileSystem.exists(path));
+    assertFalse(fileSystem.exists(badRowIdsPath));
   }
-  
+
+//  private void debug(Path file) throws IOException {
+//    if (!fileSystem.exists(file)) {
+//      return;
+//    }
+//    System.out.println(file);
+//    if (!fileSystem.isFile(file)) {
+//      FileStatus[] listStatus = fileSystem.listStatus(file);
+//      for (FileStatus f : listStatus) {
+//        debug(f.getPath());
+//      }
+//    }
+//  }
+
   @Test
   public void testIndexImporterWithWrongRowIdShardCombination() throws IOException {
     setupWriter(configuration);
     Document document = TransactionRecorder.convert("2", genRecord("1"), new StringBuilder(),
new BlurAnalyzer());
-    writer.addDocument(document);
-    writer.commit();
+    commitWriter.addDocument(document);
+    commitWriter.commit();
+    commitWriter.close();
     indexImporter.run();
-    assertFalse(path.exists());
-    assertTrue(badRowIdsPath.exists());
+    assertFalse(fileSystem.exists(path));
+    assertTrue(fileSystem.exists(badRowIdsPath));
   }
-  
+
   private Record genRecord(String recordId) {
     Record record = new Record();
     record.setFamily("testing");

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bf1258f8/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
b/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
index 8ab85a4..b3eb93e 100644
--- a/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
@@ -80,6 +80,7 @@ public class TransactionRecorderTest {
 
   @Test
   public void testReplaySimpleTest() throws IOException, InterruptedException {
+    TableContext.clear();
     Configuration configuration = new Configuration(false);
     URI fileSystemUri = MiniCluster.getFileSystemUri();
     Path path = new Path(fileSystemUri.toString() + "/transaction-recorder-test");
@@ -101,13 +102,14 @@ public class TransactionRecorderTest {
     TransactionRecorder transactionRecorder = new TransactionRecorder(shardContext);
     closeThis.add(transactionRecorder);
     transactionRecorder.open();
+
     try {
       transactionRecorder.replaceRow(true, genRow(), null);
       fail("Should NPE");
     } catch (NullPointerException e) {
     }
 
-    Thread.sleep(TimeUnit.NANOSECONDS.toMillis(tableContext.getTimeBetweenWALSyncsNanos())
* 2);
+    Thread.sleep(TimeUnit.SECONDS.toMillis(2));
 
     RAMDirectory directory = new RAMDirectory();
     IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
@@ -115,8 +117,11 @@ public class TransactionRecorderTest {
 
     TransactionRecorder replayTransactionRecorder = new TransactionRecorder(shardContext);
     closeThis.add(replayTransactionRecorder);
+    System.out.println("REPLAY");
     replayTransactionRecorder.replay(writer);
+    System.out.println("REPLAY COMPLETE");
     IndexReader reader = DirectoryReader.open(directory);
+    System.out.println("assert");
     assertEquals(1, reader.numDocs());
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bf1258f8/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputCloser.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputCloser.java
b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputCloser.java
index e62fcf5..8a9ee86 100644
--- a/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputCloser.java
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputCloser.java
@@ -70,7 +70,7 @@ public class IndexInputCloser implements Runnable, Closeable {
           refs.remove(ref);
         }
       } catch (InterruptedException e) {
-        LOG.info("Interrupted");
+        LOG.debug("Interrupted");
         running.set(false);
         return;
       }


Mime
View raw message