hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1166989 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/zookeeper/ src/test/java/org/apache/hadoop/hbase/master/
Date Fri, 09 Sep 2011 04:05:26 GMT
Author: stack
Date: Fri Sep  9 04:05:26 2011
New Revision: 1166989

URL: http://svn.apache.org/viewvc?rev=1166989&view=rev
Log:
HBASE-4007 distributed log splitting can get indefinitely stuck

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1166989&r1=1166988&r2=1166989&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Sep  9 04:05:26 2011
@@ -253,6 +253,8 @@ Release 0.91.0 - Unreleased
                (ramkrishna.s.vasudevan)
    HBASE-4350  Fix a Bloom filter bug introduced by HFile v2 and
                TestMultiColumnScanner that caught it (Mikhail Bautin)
+   HBASE-4007  distributed log splitting can get indefinitely stuck
+               (Prakash Khemani)
            
                
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1166989&r1=1166988&r2=1166989&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Fri Sep
 9 04:05:26 2011
@@ -233,6 +233,9 @@ public class MasterFileSystem {
     }
       
     if (distributedLogSplitting) {
+      for (ServerName serverName : serverNames) {
+        splitLogManager.handleDeadWorker(serverName.toString());
+      }
       splitTime = EnvironmentEdgeManager.currentTimeMillis();
       try {
         try {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1166989&r1=1166988&r2=1166989&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Fri Sep
 9 04:05:26 2011
@@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.zo
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -57,6 +59,9 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
 
+import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.*;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.*;
+
 /**
  * Distributes the task of log splitting to the available region servers.
  * Coordination happens via zookeeper. For every log file that has to be split a
@@ -105,6 +110,9 @@ public class SplitLogManager extends Zoo
     new ConcurrentHashMap<String, Task>();
   private TimeoutMonitor timeoutMonitor;
 
+  private Set<String> deadWorkers = null;
+  private Object deadWorkersLock = new Object();
+
   /**
    * Its OK to construct this object even when region-servers are not online. It
    * does lookup the orphan tasks in zk but it doesn't block for them to be
@@ -307,9 +315,9 @@ public class SplitLogManager extends Zoo
     }
   }
 
-  private void setDone(String path, boolean err) {
+  private void setDone(String path, TerminationStatus status) {
     if (!ZKSplitLog.isRescanNode(watcher, path)) {
-      if (!err) {
+      if (status == SUCCESS) {
         tot_mgr_log_split_success.incrementAndGet();
         LOG.info("Done splitting " + path);
       } else {
@@ -329,7 +337,7 @@ public class SplitLogManager extends Zoo
       // accessing task.batch here.
       if (!task.isOrphan()) {
         synchronized (task.batch) {
-          if (!err) {
+          if (status == SUCCESS) {
             task.batch.done++;
           } else {
             task.batch.error++;
@@ -366,7 +374,7 @@ public class SplitLogManager extends Zoo
   private void createNodeFailure(String path) {
     // TODO the Manger should split the log locally instead of giving up
     LOG.warn("failed to create task node" + path);
-    setDone(path, true);
+    setDone(path, FAILURE);
   }
 
 
@@ -381,7 +389,7 @@ public class SplitLogManager extends Zoo
     if (data == null) {
       tot_mgr_null_data.incrementAndGet();
       LOG.fatal("logic error - got null data " + path);
-      setDone(path, true);
+      setDone(path, FAILURE);
       return;
     }
     data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
@@ -390,36 +398,36 @@ public class SplitLogManager extends Zoo
       LOG.debug("task not yet acquired " + path + " ver = " + version);
       handleUnassignedTask(path);
     } else if (TaskState.TASK_OWNED.equals(data)) {
-      registerHeartbeat(path, version,
+      heartbeat(path, version,
           TaskState.TASK_OWNED.getWriterName(data));
     } else if (TaskState.TASK_RESIGNED.equals(data)) {
       LOG.info("task " + path + " entered state " + new String(data));
-      resubmit(path, true);
+      resubmitOrFail(path, FORCE);
     } else if (TaskState.TASK_DONE.equals(data)) {
       LOG.info("task " + path + " entered state " + new String(data));
       if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
         if (taskFinisher.finish(TaskState.TASK_DONE.getWriterName(data),
             ZKSplitLog.getFileName(path)) == Status.DONE) {
-          setDone(path, false); // success
+          setDone(path, SUCCESS);
         } else {
-          resubmit(path, false); // err
+          resubmitOrFail(path, CHECK);
         }
       } else {
-        setDone(path, false); // success
+        setDone(path, SUCCESS);
       }
     } else if (TaskState.TASK_ERR.equals(data)) {
       LOG.info("task " + path + " entered state " + new String(data));
-      resubmit(path, false);
+      resubmitOrFail(path, CHECK);
     } else {
       LOG.fatal("logic error - unexpected zk state for path = " + path
           + " data = " + new String(data));
-      setDone(path, true);
+      setDone(path, FAILURE);
     }
   }
 
   private void getDataSetWatchFailure(String path) {
     LOG.warn("failed to set data watch " + path);
-    setDone(path, true);
+    setDone(path, FAILURE);
   }
 
   /**
@@ -440,23 +448,19 @@ public class SplitLogManager extends Zoo
       LOG.info("resubmitting unassigned orphan task " + path);
       // ignore failure to resubmit. The timeout-monitor will handle it later
       // albeit in a more crude fashion
-      resubmit(path, task, true);
+      resubmit(path, task, FORCE);
     }
   }
 
-  private void registerHeartbeat(String path, int new_version,
+  private void heartbeat(String path, int new_version,
       String workerName) {
     Task task = findOrCreateOrphanTask(path);
     if (new_version != task.last_version) {
       if (task.isUnassigned()) {
         LOG.info("task " + path + " acquired by " + workerName);
       }
-      // very noisy
-      //LOG.debug("heartbeat for " + path + " last_version=" + task.last_version +
-      //    " last_update=" + task.last_update + " new_version=" +
-      //    new_version);
-      task.last_update = EnvironmentEdgeManager.currentTimeMillis();
-      task.last_version = new_version;
+      task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(),
+          new_version, workerName);
       tot_mgr_heartbeat.incrementAndGet();
     } else {
       assert false;
@@ -465,14 +469,15 @@ public class SplitLogManager extends Zoo
     return;
   }
 
-  private boolean resubmit(String path, Task task, boolean force) {
+  private boolean resubmit(String path, Task task,
+      ResubmitDirective directive) {
     // its ok if this thread misses the update to task.deleted. It will
     // fail later
     if (task.deleted) {
       return false;
     }
     int version;
-    if (!force) {
+    if (directive != FORCE) {
       if ((EnvironmentEdgeManager.currentTimeMillis() - task.last_update) <
           timeout) {
         return false;
@@ -485,7 +490,7 @@ public class SplitLogManager extends Zoo
         }
         return false;
       }
-      // race with registerHeartBeat that might be changing last_version
+      // race with heartbeat() that might be changing last_version
       version = task.last_version;
     } else {
       version = -1;
@@ -510,7 +515,7 @@ public class SplitLogManager extends Zoo
       return false;
     }
     // don't count forced resubmits
-    if (!force) {
+    if (directive != FORCE) {
       task.unforcedResubmits++;
     }
     task.setUnassigned();
@@ -519,9 +524,9 @@ public class SplitLogManager extends Zoo
     return true;
   }
 
-  private void resubmit(String path, boolean force) {
-    if (resubmit(path, findOrCreateOrphanTask(path), force) == false) {
-      setDone(path, true); // error
+  private void resubmitOrFail(String path, ResubmitDirective directive) {
+    if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
+      setDone(path, FAILURE);
     }
   }
 
@@ -558,14 +563,22 @@ public class SplitLogManager extends Zoo
    * @throws KeeperException 
    */
   private void createRescanNode(long retries) {
+    // The RESCAN node will be deleted almost immediately by the
+    // SplitLogManager as soon as it is created because it is being
+    // created in the DONE state. This behavior prevents a buildup
+    // of RESCAN nodes. But there is also a chance that a SplitLogWorker
+    // might miss the watch-trigger that creation of RESCAN node provides.
+    // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
+    // therefore this behavior is safe.
     this.watcher.getRecoverableZooKeeper().getZooKeeper().
       create(ZKSplitLog.getRescanNode(watcher),
-        TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT_SEQUENTIAL,
+        TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.EPHEMERAL_SEQUENTIAL,
         new CreateRescanAsyncCallback(), new Long(retries));
   }
 
   private void createRescanSuccess(String path) {
+    lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
     tot_mgr_rescan.incrementAndGet();
     getDataSetWatch(path, zkretries);
   }
@@ -698,6 +711,7 @@ public class SplitLogManager extends Zoo
   static class Task {
     long last_update;
     int last_version;
+    String cur_worker_name;
     TaskBatch batch;
     boolean deleted;
     int incarnation;
@@ -707,6 +721,7 @@ public class SplitLogManager extends Zoo
     public String toString() {
       return ("last_update = " + last_update +
           " last_version = " + last_version +
+          " cur_worker_name = " + cur_worker_name +
           " deleted = " + deleted +
           " incarnation = " + incarnation +
           " resubmits = " + unforcedResubmits +
@@ -739,11 +754,30 @@ public class SplitLogManager extends Zoo
       return (last_update == -1);
     }
 
+    public void heartbeat(long time, int version, String worker) {
+      last_version = version;
+      last_update = time;
+      cur_worker_name = worker;
+    }
+
     public void setUnassigned() {
+      cur_worker_name = null;
       last_update = -1;
     }
   }
 
+  void handleDeadWorker(String worker_name) {
+    // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
+    // to reason about concurrency. Makes it easier to retry.
+    synchronized (deadWorkersLock) {
+      if (deadWorkers == null) {
+        deadWorkers = new HashSet<String>(100);
+      }
+      deadWorkers.add(worker_name);
+    }
+    LOG.info("dead splitlog worker " + worker_name);
+  }
+
   /**
    * Periodically checks all active tasks and resubmits the ones that have timed
    * out
@@ -759,10 +793,17 @@ public class SplitLogManager extends Zoo
       int unassigned = 0;
       int tot = 0;
       boolean found_assigned_task = false;
+      Set<String> localDeadWorkers;
+
+      synchronized (deadWorkersLock) {
+        localDeadWorkers = deadWorkers;
+        deadWorkers = null;
+      }
 
       for (Map.Entry<String, Task> e : tasks.entrySet()) {
         String path = e.getKey();
         Task task = e.getValue();
+        String cur_worker = task.cur_worker_name;
         tot++;
         // don't easily resubmit a task which hasn't been picked up yet. It
         // might be a long while before a SplitLogWorker is free to pick up a
@@ -774,7 +815,16 @@ public class SplitLogManager extends Zoo
           continue;
         }
         found_assigned_task = true;
-        if (resubmit(path, task, false)) {
+        if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
+          tot_mgr_resubmit_dead_server_task.incrementAndGet();
+          if (resubmit(path, task, FORCE)) {
+            resubmitted++;
+          } else {
+            handleDeadWorker(cur_worker);
+            LOG.warn("Failed to resubmit task " + path + " owned by dead " +
+                cur_worker + ", will retry.");
+          }
+        } else if (resubmit(path, task, CHECK)) {
           resubmitted++;
         }
       }
@@ -994,4 +1044,12 @@ public class SplitLogManager extends Zoo
      */
     public Status finish(String workerName, String taskname);
   }
-}
+  enum ResubmitDirective {
+    CHECK(),
+    FORCE();
+  }
+  enum TerminationStatus {
+    SUCCESS(),
+    FAILURE();
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1166989&r1=1166988&r2=1166989&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Fri Sep  9
04:05:26 2011
@@ -235,6 +235,8 @@ public class ZKSplitLog {
     public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0);
     public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0);
     public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0);
+    public static AtomicLong tot_mgr_resubmit_dead_server_task =
+      new AtomicLong(0);
 
 
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1166989&r1=1166988&r2=1166989&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Fri
Sep  9 04:05:26 2011
@@ -131,31 +131,6 @@ public class TestSplitLogManager {
     assertTrue(false);
   }
 
-  private int numRescanPresent() throws KeeperException {
-    int num = 0;
-    List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
-    for (String node : nodes) {
-      if (ZKSplitLog.isRescanNode(zkw,
-          ZKUtil.joinZNode(zkw.splitLogZNode, node))) {
-        num++;
-      }
-    }
-    return num;
-  }
-
-  private void setRescanNodeDone(int count) throws KeeperException {
-    List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
-    for (String node : nodes) {
-      String nodepath = ZKUtil.joinZNode(zkw.splitLogZNode, node);
-      if (ZKSplitLog.isRescanNode(zkw, nodepath)) {
-        ZKUtil.setData(zkw, nodepath,
-            TaskState.TASK_DONE.get("some-worker"));
-        count--;
-      }
-    }
-    assertEquals(0, count);
-  }
-
   private String submitTaskAndWait(TaskBatch batch, String name)
   throws KeeperException, InterruptedException {
     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
@@ -222,7 +197,6 @@ public class TestSplitLogManager {
     waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
     assertTrue(task.isUnassigned());
     waitForCounter(tot_mgr_rescan, 0, 1, to + 100);
-    assertEquals(1, numRescanPresent());
   }
 
   @Test
@@ -253,7 +227,6 @@ public class TestSplitLogManager {
     assertTrue(task.isOrphan());
     assertTrue(task.isUnassigned());
     assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
-    assertEquals(1, numRescanPresent());
   }
 
   @Test
@@ -286,7 +259,6 @@ public class TestSplitLogManager {
     ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker3"));
     waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
     waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + 100);
-    assertEquals(2, numRescanPresent());
     Thread.sleep(to + 100);
     assertEquals(2L, tot_mgr_resubmit.get());
   }
@@ -311,16 +283,12 @@ public class TestSplitLogManager {
     waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
     int version1 = ZKUtil.checkExists(zkw, tasknode);
     assertTrue(version1 > version);
-    assertEquals(1, numRescanPresent());
     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
     assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
         taskstate));
 
-    setRescanNodeDone(1);
-
     waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
 
-    assertEquals(0, numRescanPresent());
     return;
   }
 
@@ -377,7 +345,6 @@ public class TestSplitLogManager {
     waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
     int version1 = ZKUtil.checkExists(zkw, tasknode);
     assertTrue(version1 > version);
-    assertEquals(1, numRescanPresent());
 
     byte[] taskstate = ZKUtil.getData(zkw, tasknode);
     assertTrue(Arrays.equals(taskstate,
@@ -417,18 +384,38 @@ public class TestSplitLogManager {
           TaskState.TASK_OWNED.get("dummy-worker"));
     }
 
-    // since all the nodes in the system are not unassigned the
-    // unassigned_timeout must not have kicked in
-    assertEquals(0, numRescanPresent());
-
     // since we have stopped heartbeating the owned node therefore it should
     // get resubmitted
     LOG.info("waiting for manager to resubmit the orphan task");
     waitForCounter(tot_mgr_resubmit, 0, 1, to + 500);
-    assertEquals(1, numRescanPresent());
 
     // now all the nodes are unassigned. manager should post another rescan
     waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + 500);
-    assertEquals(2, numRescanPresent());
+  }
+
+  @Test
+  public void testDeadWorker() throws Exception {
+    LOG.info("testDeadWorker");
+
+    conf.setLong("hbase.splitlog.max.resubmit", 0);
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo/1");
+    int version = ZKUtil.checkExists(zkw, tasknode);
+
+    ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
+    waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+    slm.handleDeadWorker("worker1");
+    waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
+    waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 1000);
+
+    int version1 = ZKUtil.checkExists(zkw, tasknode);
+    assertTrue(version1 > version);
+    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
+    assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
+        taskstate));
+    return;
   }
 }



Mime
View raw message