incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding a global lock to only allow a single import to occur at a time per process.
Date Mon, 03 Mar 2014 21:29:33 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/apache-blur-0.2 bfb49c739 -> 2b8f22c1a


Adding a global lock to only allow a single import to occur at a time per process.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/2b8f22c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/2b8f22c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/2b8f22c1

Branch: refs/heads/apache-blur-0.2
Commit: 2b8f22c1ad2bff0e2afdcc28c1c2584408c4bac0
Parents: bfb49c7
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Mar 3 16:29:26 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Mar 3 16:29:26 2014 -0500

----------------------------------------------------------------------
 .../blur/manager/writer/IndexImporter.java      | 118 ++++++++++---------
 1 file changed, 64 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2b8f22c1/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
index 2600725..838be4d 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -29,6 +29,8 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -63,6 +65,7 @@ public class IndexImporter extends TimerTask implements Closeable {
   private static final String COMMIT = ".commit";
   private static final String INUSE = ".inuse";
   private static final String BADINDEX = ".badindex";
+  private static final Lock _globalLock = new ReentrantReadWriteLock().writeLock();
 
   private final static Log LOG = LogFactory.getLog(IndexImporter.class);
 
@@ -72,9 +75,9 @@ public class IndexImporter extends TimerTask implements Closeable {
   private final String _table;
   private final String _shard;
   private final AtomicBoolean _running = new AtomicBoolean();
+  private final long _cleanupDelay;
 
   private long _lastCleanup;
-  private final long _cleanupDelay;
 
   public IndexImporter(BlurIndex blurIndex, ShardContext shardContext, TimeUnit refreshUnit,
long refreshAmount) {
     _running.set(true);
@@ -100,69 +103,76 @@ public class IndexImporter extends TimerTask implements Closeable {
 
   @Override
   public void run() {
-    if (_lastCleanup + _cleanupDelay < System.currentTimeMillis()) {
-      try {
-        cleanupOldDirs();
-      } catch (IOException e) {
-        LOG.error("Unknown error while trying to clean old directories on [{1}/{2}].", e,
_shard, _table);
-      }
-      _lastCleanup = System.currentTimeMillis();
-    }
-    Path path = _shardContext.getHdfsDirPath();
-    Configuration configuration = _shardContext.getTableContext().getConfiguration();
+    // Only allow one import to occur in the process at a time.
+    _globalLock.lock();
     try {
-      FileSystem fileSystem = path.getFileSystem(configuration);
-      SortedSet<FileStatus> listStatus;
-      while (true) {
-        if (!_running.get()) {
-          return;
-        }
-        try {
-          listStatus = sort(fileSystem.listStatus(path, new PathFilter() {
-            @Override
-            public boolean accept(Path path) {
-              if (path != null && path.getName().endsWith(COMMIT)) {
-                return true;
-              }
-              return false;
-            }
-          }));
-          break;
-        } catch (FileNotFoundException e) {
-          LOG.warn("File not found error, retrying.");
-        }
+      if (_lastCleanup + _cleanupDelay < System.currentTimeMillis()) {
         try {
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-          return;
+          cleanupOldDirs();
+        } catch (IOException e) {
+          LOG.error("Unknown error while trying to clean old directories on [{1}/{2}].",
e, _shard, _table);
         }
+        _lastCleanup = System.currentTimeMillis();
       }
-      for (FileStatus fileStatus : listStatus) {
-        Path file = fileStatus.getPath();
-        if (fileStatus.isDir() && file.getName().endsWith(COMMIT)) {
-          // rename to inuse, if good continue else rename to badindex
-          Path inuse = new Path(file.getParent(), rename(file.getName(), INUSE));
-          if (fileSystem.rename(file, inuse)) {
-            HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, inuse);
-            if (DirectoryReader.indexExists(hdfsDirectory)) {
-              IndexAction indexAction = getIndexAction(hdfsDirectory, fileSystem);
-              _blurIndex.process(indexAction);
-              return;
-            } else {
-              Path badindex = new Path(file.getParent(), rename(file.getName(), BADINDEX));
-              if (fileSystem.rename(inuse, badindex)) {
-                LOG.error("Directory found at [{0}] is not a vaild index, renaming to [{1}].",
inuse, badindex);
+      Path path = _shardContext.getHdfsDirPath();
+      Configuration configuration = _shardContext.getTableContext().getConfiguration();
+      try {
+        FileSystem fileSystem = path.getFileSystem(configuration);
+        SortedSet<FileStatus> listStatus;
+        while (true) {
+          if (!_running.get()) {
+            return;
+          }
+          try {
+            listStatus = sort(fileSystem.listStatus(path, new PathFilter() {
+              @Override
+              public boolean accept(Path path) {
+                if (path != null && path.getName().endsWith(COMMIT)) {
+                  return true;
+                }
+                return false;
+              }
+            }));
+            break;
+          } catch (FileNotFoundException e) {
+            LOG.warn("File not found error, retrying.");
+          }
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            return;
+          }
+        }
+        for (FileStatus fileStatus : listStatus) {
+          Path file = fileStatus.getPath();
+          if (fileStatus.isDir() && file.getName().endsWith(COMMIT)) {
+            // rename to inuse, if good continue else rename to badindex
+            Path inuse = new Path(file.getParent(), rename(file.getName(), INUSE));
+            if (fileSystem.rename(file, inuse)) {
+              HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, inuse);
+              if (DirectoryReader.indexExists(hdfsDirectory)) {
+                IndexAction indexAction = getIndexAction(hdfsDirectory, fileSystem);
+                _blurIndex.process(indexAction);
+                return;
               } else {
-                LOG.fatal("Directory found at [{0}] is not a vaild index, could not rename
to [{1}].", inuse, badindex);
+                Path badindex = new Path(file.getParent(), rename(file.getName(), BADINDEX));
+                if (fileSystem.rename(inuse, badindex)) {
+                  LOG.error("Directory found at [{0}] is not a vaild index, renaming to [{1}].",
inuse, badindex);
+                } else {
+                  LOG.fatal("Directory found at [{0}] is not a vaild index, could not rename
to [{1}].", inuse,
+                      badindex);
+                }
               }
+            } else {
+              LOG.fatal("Could not rename [{0}] to inuse dir.", file);
             }
-          } else {
-            LOG.fatal("Could not rename [{0}] to inuse dir.", file);
           }
         }
+      } catch (IOException e) {
+        LOG.error("Unknown error while trying to refresh imports on [{1}/{2}].", e, _shard,
_table);
       }
-    } catch (IOException e) {
-      LOG.error("Unknown error while trying to refresh imports on [{1}/{2}].", e, _shard,
_table);
+    } finally {
+      _globalLock.unlock();
     }
   }
 


Mime
View raw message