accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [31/50] [abbrv] git commit: ACCUMULO-378 More logging and more robustness when ZK is "slow"
Date Wed, 21 May 2014 01:59:50 GMT
ACCUMULO-378 More logging and more robustness when ZK is "slow"


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/410a5ec9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/410a5ec9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/410a5ec9

Branch: refs/heads/ACCUMULO-378
Commit: 410a5ec9ffcd42c04ea88e54ac22377ac41cbc95
Parents: e1f697d
Author: Josh Elser <elserj@apache.org>
Authored: Mon May 19 13:54:10 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Mon May 19 13:54:10 2014 -0400

----------------------------------------------------------------------
 .../DistributedWorkQueueWorkAssigner.java       | 32 ++++++++++++++++----
 1 file changed, 26 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/410a5ec9/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index e97f3ca..84f9af5 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -98,6 +98,7 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner
{
     }
 
     if (null == queuedWork) {
+      log.info("Reinitializing state from DistributedWorkQueue in ZooKeeper");
       initializeQueuedWork();
     }
 
@@ -108,9 +109,11 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner
{
     // Get the maximum number of entries we want to queue work for (or the default)
     this.maxQueueSize = conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
 
+    log.info("Creating work entries from replication table");
     // Scan over the work records, adding the work to the queue
     createWork();
 
+    log.info("Cleaning up finished work entries from replication table");
     // Keep the state of the work we queued correct
     cleanupFinishedWork();
   }
@@ -182,10 +185,23 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner
{
   protected void initializeQueuedWork() {
     Preconditions.checkArgument(null == queuedWork, "Expected queuedWork to be null");
     queuedWork = new HashSet<>();
-    try {
-      queuedWork.addAll(workQueue.getWorkQueued());
-    } catch (KeeperException | InterruptedException e) {
-      throw new RuntimeException("Error reading existing queued replication work", e);
+    while (true) {
+      try {
+        queuedWork.addAll(workQueue.getWorkQueued());
+        return;
+      } catch (KeeperException e) {
+        if (KeeperException.Code.NONODE.equals(e.code())) {
+          log.warn("Could not find ZK root for replication work queue, will retry", e);
+          UtilWaitThread.sleep(500);
+          continue;
+        }
+
+        log.error("Error reading existing queued replication work from ZooKeeper", e);
+        throw new RuntimeException("Error reading existing queued replication work from ZooKeeper",
e);
+      } catch (InterruptedException e) {
+        log.error("Error reading existing queued replication work from ZooKeeper", e);
+        throw new RuntimeException("Error reading existing queued replication work from ZooKeeper",
e);
+      }
     }
   }
 
@@ -198,20 +214,21 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner
{
     try {
       bs = ReplicationTable.getBatchScanner(conn, 4);
     } catch (TableNotFoundException e) {
-      UtilWaitThread.sleep(1000);
       return;
     }
 
+    log.info("Creating batchscanner to read Work records from the replication table");
+
     WorkSection.limit(bs);
     bs.setRanges(Collections.singleton(new Range()));
     Text buffer = new Text();
+    long filesWorkWasCreatedFrom = 0l;
     try {
       for (Entry<Key,Value> entry : bs) {
         // If we're not working off the entries, we need to not shoot ourselves in the foot
by continuing
         // to add more work entries
         if (queuedWork.size() > maxQueueSize) {
           log.warn("Queued replication work exceeds configured maximum ({}), sleeping to
allow work to occur", maxQueueSize);
-          UtilWaitThread.sleep(5000);
           return;
         }
 
@@ -235,6 +252,7 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner
{
           // And, we haven't already queued this file up for work already
           if (!queuedWork.contains(key)) {
             queueWork(key, file);
+            filesWorkWasCreatedFrom++;
           } else {
             log.trace("Not re-queueing work for {}", key);
           }
@@ -247,6 +265,8 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner
{
         bs.close();
       }
     }
+
+    log.info("Created work entries for {} files", filesWorkWasCreatedFrom);
   }
 
   /**


Mime
View raw message