incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject git commit: BLUR:95 fixed.
Date Sun, 26 May 2013 05:17:35 GMT
Updated Branches:
  refs/heads/0.1.5 a18bce613 -> 91afa673f


BLUR:95 fixed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/91afa673
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/91afa673
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/91afa673

Branch: refs/heads/0.1.5
Commit: 91afa673f0e56d717590c0f4efe9afe906fa1c4d
Parents: a18bce6
Author: Gagan <gagandeepjuneja@gmail.com>
Authored: Sun May 26 10:46:57 2013 +0530
Committer: Gagan <gagandeepjuneja@gmail.com>
Committed: Sun May 26 10:46:57 2013 +0530

----------------------------------------------------------------------
 .../apache/blur/manager/writer/IndexImporter.java  |   54 +++++-
 .../blur/manager/writer/IndexImporterTest.java     |  141 +++++++++++++++
 2 files changed, 185 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/91afa673/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
index 5c09ab3..4efd908 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -13,13 +13,16 @@ import java.util.concurrent.locks.ReadWriteLock;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurPartitioner;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.CompositeReaderContext;
@@ -92,15 +95,35 @@ public class IndexImporter extends TimerTask implements Closeable {
         for (HdfsDirectory directory : indexesToImport) {
           LOG.info("Starting import [{0}], commiting on [{1}/{2}]", directory, shard, table);
           indexWriter.commit();
-          applyDeletes(directory, indexWriter);
-          LOG.info("Add index [{0}] [{1}/{2}]", directory, shard, table);
-          indexWriter.addIndexes(directory);
-          LOG.info("Finishing import [{0}], commiting on [{1}/{2}]", directory, shard, table);
-          indexWriter.commit();
-          Path dirPath = directory.getPath();
-          LOG.info("Cleaning up old directory [{0}] for [{1}/{2}]", dirPath, shard, table);
-          fileSystem.delete(dirPath, true);
-          LOG.info("Import complete on [{0}/{1}]", shard, table);
+          boolean isSuccess = false;
+          boolean isRollbackDueToException = false;
+          try {
+            isSuccess = applyDeletes(directory, indexWriter, shard);
+          }catch(IOException e){
+            LOG.error("Some issue with deleting the old index", e);
+            isSuccess = false;
+            isRollbackDueToException = true;
+          }
+          if (isSuccess) {
+            LOG.info("Add index [{0}] [{1}/{2}]", directory, shard, table);
+            indexWriter.addIndexes(directory);
+            LOG.info("Finishing import [{0}], commiting on [{1}/{2}]", directory, shard,
table);
+            indexWriter.commit();
+            Path dirPath = directory.getPath();
+            LOG.info("Cleaning up old directory [{0}] for [{1}/{2}]", dirPath, shard, table);
+            fileSystem.delete(dirPath, true);
+            LOG.info("Import complete on [{0}/{1}]", shard, table);
+          } else {
+            if(!isRollbackDueToException){
+              LOG.error("Index is corrupted, RowIds are found in wrong shard [{0}/{1}], cancelling
index import for [{2}]", shard, table, directory);
+            }
+            LOG.info("Starting rollback on [{0}/{1}]", shard, table);
+            indexWriter.rollback();
+            LOG.info("Finished rollback on [{0}/{1}]", shard, table);
+            Path oldDirPath = directory.getPath();
+            String newDirectoryName = oldDirPath.getName().split("\\.")[0] + ".bad_rowids";
+            fileSystem.rename(oldDirPath, new Path(oldDirPath.getParent(), newDirectoryName));
+          }
         }
       } finally {
         _lock.writeLock().unlock();
@@ -119,12 +142,15 @@ public class IndexImporter extends TimerTask implements Closeable {
     return result;
   }
 
-  private void applyDeletes(Directory directory, IndexWriter indexWriter) throws IOException
{
+  private boolean applyDeletes(Directory directory, IndexWriter indexWriter, String shard)
throws IOException {
     DirectoryReader reader = DirectoryReader.open(directory);
     try {
       LOG.info("Applying deletes in reader [{0}]", reader);
       CompositeReaderContext compositeReaderContext = reader.getContext();
       List<AtomicReaderContext> leaves = compositeReaderContext.leaves();
+      BlurPartitioner blurPartitioner = new BlurPartitioner();
+      Text key = new Text();
+      int numberOfShards = _shardContext.getTableContext().getDescriptor().getShardCount();
       for (AtomicReaderContext context : leaves) {
         AtomicReader atomicReader = context.reader();
         Fields fields = atomicReader.fields();
@@ -132,6 +158,13 @@ public class IndexImporter extends TimerTask implements Closeable {
         TermsEnum termsEnum = terms.iterator(null);
         BytesRef ref = null;
         while ((ref = termsEnum.next()) != null) {
+          byte[] rowIdInBytes = ref.bytes;
+          key.set(rowIdInBytes, 0, rowIdInBytes.length);
+          int partition = blurPartitioner.getPartition(key, null, numberOfShards);
+          int shardId = BlurUtil.getShardIndex(shard);
+          if( shardId != partition){
+            return false;
+          }
           Term term = new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref));
           indexWriter.deleteDocuments(term);
         }
@@ -139,5 +172,6 @@ public class IndexImporter extends TimerTask implements Closeable {
     } finally {
       reader.close();
     }
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/91afa673/src/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
b/src/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
new file mode 100644
index 0000000..3acbe13
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -0,0 +1,141 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.index.IndexWriter;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+import org.apache.lucene.store.FSDirectory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IndexImporterTest {
+
+  private static final File TMPDIR = new File("./target/tmp");
+  
+  private File base;
+  private Configuration configuration;
+  private IndexWriter writer;
+  private IndexImporter indexImporter;
+  private Random random = new Random();
+  private File path;
+  private File badRowIdsPath;
+  @Before
+  public void setup() throws IOException {
+    base = new File(TMPDIR, "blur-index-writer-test");
+    rm(base);
+    base.mkdirs();
+    configuration = new Configuration();
+  }
+
+  private void setupWriter(Configuration configuration) throws IOException {
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName("test-table");
+    String uuid = UUID.randomUUID().toString();
+    tableDescriptor.setTableUri(new File(base, "table-store").toURI().toString());
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setShardCount(2);
+    TableContext tableContext = TableContext.create(tableDescriptor);
+    ShardContext shardContext = ShardContext.create(tableContext, "shard-00000000");
+    File tablePath = new File(base, "table-store");
+    File shardPath = new File(tablePath, "shard-00000000");
+    String indexDirName = "index_" + uuid;
+    path = new File(shardPath, indexDirName +".commit");
+    path.mkdirs();
+    badRowIdsPath = new File(shardPath, indexDirName +".bad_rowids");
+    FSDirectory directory = FSDirectory.open(path);
+    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, tableContext.getAnalyzer());
+    writer = new IndexWriter(directory, conf);
+    BufferStore.init(128, 128);
+    indexImporter = new IndexImporter(new TrackingIndexWriter(writer), new ReentrantReadWriteLock(),
shardContext, TimeUnit.MINUTES, 10);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    writer.close();
+    indexImporter.close();
+    rm(base);
+  }
+
+  private void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+  
+  @Test
+  public void testIndexImporterWithCorrectRowIdShardCombination() throws IOException {
+    setupWriter(configuration);
+    Document document = TransactionRecorder.convert("1", genRecord("1"), new StringBuilder(),
new BlurAnalyzer());
+    writer.addDocument(document);
+    writer.commit();
+    indexImporter.run();
+    assertFalse(path.exists());
+    assertFalse(badRowIdsPath.exists());
+  }
+  
+  @Test
+  public void testIndexImporterWithWrongRowIdShardCombination() throws IOException {
+    setupWriter(configuration);
+    Document document = TransactionRecorder.convert("2", genRecord("1"), new StringBuilder(),
new BlurAnalyzer());
+    writer.addDocument(document);
+    writer.commit();
+    indexImporter.run();
+    assertFalse(path.exists());
+    assertTrue(badRowIdsPath.exists());
+  }
+  
+  private Record genRecord(String recordId) {
+    Record record = new Record();
+    record.setFamily("testing");
+    record.setRecordId(recordId);
+    for (int i = 0; i < 10; i++) {
+      record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
+    }
+    return record;
+  }
+
+}


Mime
View raw message