incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/9] git commit: Fixing issue with import where if the import fails while applying updates that the import could be deleted.
Date Tue, 20 Jan 2015 01:44:10 GMT
Fixing issue with import where if the import fails while applying updates that the import could
be deleted.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/eb023146
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/eb023146
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/eb023146

Branch: refs/heads/master
Commit: eb023146f43af22a576e03df33fd5e0fc24aa4f9
Parents: 8f7464b
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Jan 19 16:01:44 2015 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Jan 19 16:01:44 2015 -0500

----------------------------------------------------------------------
 .../blur/manager/writer/IndexImporter.java      | 34 ++++++++++++++++++++
 .../blur/manager/writer/IndexImporterTest.java  | 29 +++++++++++++++--
 2 files changed, 61 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eb023146/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
index cec713c..a5390f8 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -19,6 +19,7 @@ package org.apache.blur.manager.writer;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -60,6 +61,7 @@ import org.apache.lucene.util.BytesRef;
 
 public class IndexImporter extends TimerTask implements Closeable {
 
+  private static final String INPROGRESS = ".inprogress";
   private static final String BADROWIDS = ".badrowids";
   private static final String COMMIT = ".commit";
   private static final String INUSE = ".inuse";
@@ -75,6 +77,7 @@ public class IndexImporter extends TimerTask implements Closeable {
   private final long _cleanupDelay;
 
   private long _lastCleanup;
+  private Runnable _testError;
 
   public IndexImporter(Timer indexImporterTimer, BlurIndex blurIndex, ShardContext shardContext,
TimeUnit refreshUnit,
       long refreshAmount) {
@@ -137,7 +140,11 @@ public class IndexImporter extends TimerTask implements Closeable {
           if (fileStatus.isDir() && file.getName().endsWith(COMMIT)) {
             // rename to inuse, if good continue else rename to badindex
             Path inuse = new Path(file.getParent(), rename(file.getName(), INUSE));
+            touch(fileSystem, new Path(file, INPROGRESS));
             if (fileSystem.rename(file, inuse)) {
+              if (_testError != null) {
+                _testError.run();
+              }
               HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, inuse);
               try {
                 if (DirectoryReader.indexExists(hdfsDirectory)) {
@@ -169,6 +176,10 @@ public class IndexImporter extends TimerTask implements Closeable {
     }
   }
 
+  private void touch(FileSystem fileSystem, Path path) throws IOException {
+    fileSystem.create(path, true).close();
+  }
+
   private String rename(String name, String newSuffix) {
     int lastIndexOf = name.lastIndexOf('.');
     return name.substring(0, lastIndexOf) + newSuffix;
@@ -196,6 +207,8 @@ public class IndexImporter extends TimerTask implements Closeable {
 
       @Override
       public void doPostCommit(IndexWriter writer) throws IOException {
+        Path path = directory.getPath();
+        fileSystem.delete(new Path(path, INPROGRESS), false);
         LOG.info("Import complete on [{0}/{1}]", _shard, _table);
         writer.maybeMerge();
       }
@@ -285,6 +298,18 @@ public class IndexImporter extends TimerTask implements Closeable {
       inuseDirs.remove(inuseDir);
     }
 
+    // Check if any inuse dirs have inprogress files.
+    // If they do, rename inuse to commit to retry import.
+    for (Path inuse : new HashSet<Path>(inuseDirs)) {
+      Path path = new Path(inuse, INPROGRESS);
+      if (fileSystem.exists(path)) {
+        LOG.info("Path [{0}] is not imported but has inprogress file, retrying import.",
path);
+        inuseDirs.remove(inuse);
+        Path commit = new Path(inuse.getParent(), rename(inuse.getName(), COMMIT));
+        fileSystem.rename(inuse, commit);
+      }
+    }
+
     for (Path p : inuseDirs) {
       LOG.info("Deleteing path [{0}] no longer in use.", p);
       fileSystem.delete(p, true);
@@ -311,4 +336,13 @@ public class IndexImporter extends TimerTask implements Closeable {
     }
     return result;
   }
+
+  public Runnable getTestError() {
+    return _testError;
+  }
+
+  public void setTestError(Runnable testError) {
+    _testError = testError;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eb023146/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index 6f2eee7..40e7720 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -18,8 +18,7 @@ package org.apache.blur.manager.writer;
  */
 
 import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.util.List;
@@ -288,6 +287,32 @@ public class IndexImporterTest {
     validateIndex();
   }
 
+  @Test
+  public void testIndexImporterWhenThereIsAFailureOnDuringImport() throws IOException {
+    List<Field> document = _fieldManager.getFields("1", genRecord("1"));
+    _commitWriter.addDocument(document);
+    _commitWriter.commit();
+    _commitWriter.close();
+    _indexImporter.setTestError(new Runnable() {
+      @Override
+      public void run() {
+        throw new RuntimeException("test");
+      }
+    });
+    try {
+      _indexImporter.run();
+    } catch (RuntimeException e) {
+      assertEquals("test", e.getMessage());
+    }
+    _indexImporter.cleanupOldDirs();
+    _indexImporter.setTestError(null);
+    _indexImporter.run();
+    assertFalse(_fileSystem.exists(_path));
+    assertFalse(_fileSystem.exists(_badRowIdsPath));
+    assertTrue(_fileSystem.exists(_inUsePath));
+    validateIndex();
+  }
+
   private Record genRecord(String recordId) {
     Record record = new Record();
     record.setFamily("testing");


Mime
View raw message