hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1333099 [2/5] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/executor/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/protobuf/ main/java/org/apache/hadoop/hbase/pro...
Date Wed, 02 May 2012 16:26:38 GMT
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1333099&r1=1333098&r2=1333099&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Wed May
 2 16:26:36 2012
@@ -1,7 +1,5 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
+  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -19,14 +17,19 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
+import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
+import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Set;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -38,7 +41,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.DeserializationException;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -59,36 +65,31 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
-
-import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.*;
-import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.*;
 
 /**
  * Distributes the task of log splitting to the available region servers.
  * Coordination happens via zookeeper. For every log file that has to be split a
- * znode is created under /hbase/splitlog. SplitLogWorkers race to grab a task.
+ * znode is created under <code>/hbase/splitlog</code>. SplitLogWorkers race
to grab a task.
  *
- * SplitLogManager monitors the task znodes that it creates using the
+ * <p>SplitLogManager monitors the task znodes that it creates using the
  * timeoutMonitor thread. If a task's progress is slow then
- * resubmit(String, boolean) will take away the task from the owner
- * {@link SplitLogWorker} and the task will be
- * upforgrabs again. When the task is done then the task's znode is deleted by
- * SplitLogManager.
+ * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner
+ * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done
then the
+ * task's znode is deleted by SplitLogManager.
  *
- * Clients call {@link #splitLogDistributed(Path)} to split a region server's
+ * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
  * log files. The caller thread waits in this method until all the log files
  * have been split.
  *
- * All the zookeeper calls made by this class are asynchronous. This is mainly
+ * <p>All the zookeeper calls made by this class are asynchronous. This is mainly
  * to help reduce response time seen by the callers.
  *
- * There is race in this design between the SplitLogManager and the
+ * <p>There is race in this design between the SplitLogManager and the
  * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
  * already been completed by a SplitLogWorker. We rely on the idempotency of
  * the log splitting task for correctness.
  *
- * It is also assumed that every log splitting task is unique and once
+ * <p>It is also assumed that every log splitting task is unique and once
  * completed (either with success or with error) it will be not be submitted
  * again. If a task is resubmitted then there is a risk that old "delete task"
  * can delete the re-submission.
@@ -97,8 +98,13 @@ import static org.apache.hadoop.hbase.ma
 public class SplitLogManager extends ZooKeeperListener {
   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
 
+  public static final int DEFAULT_TIMEOUT = 25000; // 25 sec
+  public static final int DEFAULT_ZK_RETRIES = 3;
+  public static final int DEFAULT_MAX_RESUBMIT = 3;
+  public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
+
   private final Stoppable stopper;
-  private final String serverName;
+  private final ServerName serverName;
   private final TaskFinisher taskFinisher;
   private FileSystem fs;
   private Configuration conf;
@@ -110,11 +116,10 @@ public class SplitLogManager extends Zoo
   private long lastNodeCreateTime = Long.MAX_VALUE;
   public boolean ignoreZKDeleteForTesting = false;
 
-  private ConcurrentMap<String, Task> tasks =
-    new ConcurrentHashMap<String, Task>();
+  private ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
   private TimeoutMonitor timeoutMonitor;
 
-  private Set<String> deadWorkers = null;
+  private volatile Set<ServerName> deadWorkers = null;
   private Object deadWorkersLock = new Object();
 
   /**
@@ -130,12 +135,12 @@ public class SplitLogManager extends Zoo
    * @param serverName
    */
   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
-      Stoppable stopper, String serverName) {
+      Stoppable stopper, ServerName serverName) {
     this(zkw, conf, stopper, serverName, new TaskFinisher() {
       @Override
-      public Status finish(String workerName, String logfile) {
+      public Status finish(ServerName workerName, String logfile) {
         String tmpname =
-          ZKSplitLog.getSplitLogDirTmpComponent(workerName, logfile);
+          ZKSplitLog.getSplitLogDirTmpComponent(workerName.toString(), logfile);
         try {
           HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf);
         } catch (IOException e) {
@@ -159,28 +164,22 @@ public class SplitLogManager extends Zoo
    * @param tf task finisher 
    */
   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
-      Stoppable stopper, String serverName, TaskFinisher tf) {
+      Stoppable stopper, ServerName serverName, TaskFinisher tf) {
     super(zkw);
     this.taskFinisher = tf;
     this.conf = conf;
     this.stopper = stopper;
-    this.zkretries = conf.getLong("hbase.splitlog.zk.retries",
-        ZKSplitLog.DEFAULT_ZK_RETRIES);
-    this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit",
-        ZKSplitLog.DEFAULT_MAX_RESUBMIT);
-    this.timeout = conf.getInt("hbase.splitlog.manager.timeout",
-        ZKSplitLog.DEFAULT_TIMEOUT);
+    this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
+    this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
+    this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
     this.unassignedTimeout =
-      conf.getInt("hbase.splitlog.manager.unassigned.timeout",
-        ZKSplitLog.DEFAULT_UNASSIGNED_TIMEOUT);
+      conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
     LOG.debug("timeout = " + timeout);
     LOG.debug("unassigned timeout = " + unassignedTimeout);
 
     this.serverName = serverName;
-    this.timeoutMonitor = new TimeoutMonitor(
-        conf.getInt("hbase.splitlog.manager.timeoutmonitor.period",
-            1000),
-        stopper);
+    this.timeoutMonitor =
+      new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
stopper);
   }
 
   public void finishInitialization() {
@@ -233,9 +232,8 @@ public class SplitLogManager extends Zoo
    * available worker region server. This method must only be called after the
    * region servers have been brought online.
    *
-   * @param logDirs
-   * @throws IOException
-   *          if there was an error while splitting any log file
+   * @param logDirs List of log dirs to split
+   * @throws IOException If there was an error while splitting any log file
    * @return cumulative size of the logfiles split
    */
   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
@@ -244,7 +242,7 @@ public class SplitLogManager extends Zoo
     FileStatus[] logfiles = getFileList(logDirs);
     status.setStatus("Checking directory contents...");
     LOG.debug("Scheduling batch of logs to split");
-    tot_mgr_log_split_batch_start.incrementAndGet();
+    SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
     LOG.info("started splitting logs in " + logDirs);
     long t = EnvironmentEdgeManager.currentTimeMillis();
     long totalSize = 0;
@@ -257,14 +255,13 @@ public class SplitLogManager extends Zoo
       // metrics that it drives will also be under-reported.
       totalSize += lf.getLen();
       if (enqueueSplitTask(lf.getPath().toString(), batch) == false) {
-        throw new IOException("duplicate log split scheduled for "
-            + lf.getPath());
+        throw new IOException("duplicate log split scheduled for " + lf.getPath());
       }
     }
     waitForSplittingCompletion(batch, status);
     if (batch.done != batch.installed) {
       batch.isDead = true;
-      tot_mgr_log_split_batch_err.incrementAndGet();
+      SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
       LOG.warn("error while splitting logs in " + logDirs +
       " installed = " + batch.installed + " but only " + batch.done + " done");
       throw new IOException("error or interrupt while splitting logs in "
@@ -285,7 +282,7 @@ public class SplitLogManager extends Zoo
           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
         }
       }
-      tot_mgr_log_split_batch_success.incrementAndGet();
+      SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
     }
     String msg = "finished splitting (more than or equal to) " + totalSize +
         " bytes in " + batch.installed + " log files in " + logDirs + " in " +
@@ -303,7 +300,9 @@ public class SplitLogManager extends Zoo
    * @return true if a new entry is created, false if it is already there.
    */
   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
-    tot_mgr_log_split_start.incrementAndGet();
+    SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
+    // This is a znode path under the splitlog dir with the rest of the path made up of an
+    // url encoding of the passed in log to split.
     String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
     Task oldtask = createTaskIfAbsent(path, batch);
     if (oldtask == null) {
@@ -340,17 +339,17 @@ public class SplitLogManager extends Zoo
     Task task = tasks.get(path);
     if (task == null) {
       if (!ZKSplitLog.isRescanNode(watcher, path)) {
-        tot_mgr_unacquired_orphan_done.incrementAndGet();
+        SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
         LOG.debug("unacquired orphan task is done " + path);
       }
     } else {
       synchronized (task) {
         if (task.status == IN_PROGRESS) {
           if (status == SUCCESS) {
-            tot_mgr_log_split_success.incrementAndGet();
+            SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
             LOG.info("Done splitting " + path);
           } else {
-            tot_mgr_log_split_err.incrementAndGet();
+            SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
             LOG.warn("Error splitting " + path);
           }
           task.status = status;
@@ -376,10 +375,9 @@ public class SplitLogManager extends Zoo
   }
 
   private void createNode(String path, Long retry_count) {
-    ZKUtil.asyncCreate(this.watcher, path,
-        TaskState.TASK_UNASSIGNED.get(serverName), new CreateAsyncCallback(),
-        retry_count);
-    tot_mgr_node_create_queued.incrementAndGet();
+    SplitLogTask slt = new SplitLogTask.Unassigned(serverName);
+    ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
retry_count);
+    SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
     return;
   }
 
@@ -400,7 +398,7 @@ public class SplitLogManager extends Zoo
     this.watcher.getRecoverableZooKeeper().getZooKeeper().
         getData(path, this.watcher,
         new GetDataAsyncCallback(), retry_count);
-    tot_mgr_get_data_queued.incrementAndGet();
+    SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
   }
 
   private void tryGetDataSetWatch(String path) {
@@ -408,37 +406,36 @@ public class SplitLogManager extends Zoo
     this.watcher.getRecoverableZooKeeper().getZooKeeper().
         getData(path, this.watcher,
         new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */);
-    tot_mgr_get_data_queued.incrementAndGet();
+    SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
   }
 
-  private void getDataSetWatchSuccess(String path, byte[] data, int version) {
+  private void getDataSetWatchSuccess(String path, byte[] data, int version)
+  throws DeserializationException {
     if (data == null) {
       if (version == Integer.MIN_VALUE) {
         // assume all done. The task znode suddenly disappeared.
         setDone(path, SUCCESS);
         return;
       }
-      tot_mgr_null_data.incrementAndGet();
+      SplitLogCounters.tot_mgr_null_data.incrementAndGet();
       LOG.fatal("logic error - got null data " + path);
       setDone(path, FAILURE);
       return;
     }
     data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
-    // LOG.debug("set watch on " + path + " got data " + new String(data));
-    if (TaskState.TASK_UNASSIGNED.equals(data)) {
+    SplitLogTask slt = SplitLogTask.parseFrom(data);
+    if (slt.isUnassigned()) {
       LOG.debug("task not yet acquired " + path + " ver = " + version);
       handleUnassignedTask(path);
-    } else if (TaskState.TASK_OWNED.equals(data)) {
-      heartbeat(path, version,
-          TaskState.TASK_OWNED.getWriterName(data));
-    } else if (TaskState.TASK_RESIGNED.equals(data)) {
+    } else if (slt.isOwned()) {
+      heartbeat(path, version, slt.getServerName());
+    } else if (slt.isResigned()) {
       LOG.info("task " + path + " entered state " + new String(data));
       resubmitOrFail(path, FORCE);
-    } else if (TaskState.TASK_DONE.equals(data)) {
+    } else if (slt.isDone()) {
       LOG.info("task " + path + " entered state " + new String(data));
       if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
-        if (taskFinisher.finish(TaskState.TASK_DONE.getWriterName(data),
-            ZKSplitLog.getFileName(path)) == Status.DONE) {
+        if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE)
{
           setDone(path, SUCCESS);
         } else {
           resubmitOrFail(path, CHECK);
@@ -446,12 +443,11 @@ public class SplitLogManager extends Zoo
       } else {
         setDone(path, SUCCESS);
       }
-    } else if (TaskState.TASK_ERR.equals(data)) {
+    } else if (slt.isErr()) {
       LOG.info("task " + path + " entered state " + new String(data));
       resubmitOrFail(path, CHECK);
     } else {
-      LOG.fatal("logic error - unexpected zk state for path = " + path
-          + " data = " + new String(data));
+      LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + new
String(data));
       setDone(path, FAILURE);
     }
   }
@@ -483,16 +479,14 @@ public class SplitLogManager extends Zoo
     }
   }
 
-  private void heartbeat(String path, int new_version,
-      String workerName) {
+  private void heartbeat(String path, int new_version, ServerName workerName) {
     Task task = findOrCreateOrphanTask(path);
     if (new_version != task.last_version) {
       if (task.isUnassigned()) {
         LOG.info("task " + path + " acquired by " + workerName);
       }
-      task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(),
-          new_version, workerName);
-      tot_mgr_heartbeat.incrementAndGet();
+      task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName);
+      SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
     } else {
       // duplicate heartbeats - heartbeats w/o zk node version
       // changing - are possible. The timeout thread does
@@ -502,10 +496,8 @@ public class SplitLogManager extends Zoo
     return;
   }
 
-  private boolean resubmit(String path, Task task,
-      ResubmitDirective directive) {
-    // its ok if this thread misses the update to task.deleted. It will
-    // fail later
+  private boolean resubmit(String path, Task task, ResubmitDirective directive) {
+    // its ok if this thread misses the update to task.deleted. It will fail later
     if (task.status != IN_PROGRESS) {
       return false;
     }
@@ -518,7 +510,7 @@ public class SplitLogManager extends Zoo
       if (task.unforcedResubmits >= resubmit_threshold) {
         if (!task.resubmitThresholdReached) {
           task.resubmitThresholdReached = true;
-          tot_mgr_resubmit_threshold_reached.incrementAndGet();
+          SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
           LOG.info("Skipping resubmissions of task " + path +
               " because threshold " + resubmit_threshold + " reached");
         }
@@ -533,9 +525,8 @@ public class SplitLogManager extends Zoo
     task.incarnation++;
     try {
       // blocking zk call but this is done from the timeout thread
-      if (ZKUtil.setData(this.watcher, path,
-          TaskState.TASK_UNASSIGNED.get(serverName),
-          version) == false) {
+      SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName);
+      if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
         LOG.debug("failed to resubmit task " + path +
             " version changed");
         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
@@ -544,15 +535,20 @@ public class SplitLogManager extends Zoo
     } catch (NoNodeException e) {
       LOG.warn("failed to resubmit because znode doesn't exist " + path +
           " task done (or forced done by removing the znode)");
-      getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+      try {
+        getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+      } catch (DeserializationException e1) {
+        LOG.debug("failed to re-resubmit task " + path + " because of deserialization issue");
+        task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
+        return false;
+      }
       return false;
     } catch (KeeperException.BadVersionException e) {
-      LOG.debug("failed to resubmit task " + path +
-          " version changed");
+      LOG.debug("failed to resubmit task " + path + " version changed");
       task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
       return false;
     } catch (KeeperException e) {
-      tot_mgr_resubmit_failed.incrementAndGet();
+      SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
       LOG.warn("failed to resubmit " + path, e);
       return false;
     }
@@ -562,7 +558,7 @@ public class SplitLogManager extends Zoo
     }
     task.setUnassigned();
     createRescanNode(Long.MAX_VALUE);
-    tot_mgr_resubmit.incrementAndGet();
+    SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
     return true;
   }
 
@@ -573,7 +569,7 @@ public class SplitLogManager extends Zoo
   }
 
   private void deleteNode(String path, Long retries) {
-    tot_mgr_node_delete_queued.incrementAndGet();
+    SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
     // Once a task znode is ready for delete, that is it is in the TASK_DONE
     // state, then no one should be writing to it anymore. That is no one
     // will be updating the znode version any more.
@@ -590,9 +586,9 @@ public class SplitLogManager extends Zoo
     task = tasks.remove(path);
     if (task == null) {
       if (ZKSplitLog.isRescanNode(watcher, path)) {
-        tot_mgr_rescan_deleted.incrementAndGet();
+        SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
       }
-      tot_mgr_missing_state_in_delete.incrementAndGet();
+      SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
       LOG.debug("deleted task without in memory state " + path);
       return;
     }
@@ -600,7 +596,7 @@ public class SplitLogManager extends Zoo
       task.status = DELETED;
       task.notify();
     }
-    tot_mgr_task_deleted.incrementAndGet();
+    SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
   }
 
   private void deleteNodeFailure(String path) {
@@ -622,16 +618,16 @@ public class SplitLogManager extends Zoo
     // might miss the watch-trigger that creation of RESCAN node provides.
     // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
     // therefore this behavior is safe.
+    SplitLogTask slt = new SplitLogTask.Done(this.serverName);
     this.watcher.getRecoverableZooKeeper().getZooKeeper().
-      create(ZKSplitLog.getRescanNode(watcher),
-        TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE,
-        CreateMode.EPHEMERAL_SEQUENTIAL,
+      create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
         new CreateRescanAsyncCallback(), Long.valueOf(retries));
   }
 
   private void createRescanSuccess(String path) {
     lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
-    tot_mgr_rescan.incrementAndGet();
+    SplitLogCounters.tot_mgr_rescan.incrementAndGet();
     getDataSetWatch(path, zkretries);
   }
 
@@ -668,14 +664,12 @@ public class SplitLogManager extends Zoo
         if (oldtask.status == IN_PROGRESS) {
           oldtask.batch = batch;
           batch.installed++;
-          LOG.debug("Previously orphan task " + path +
-              " is now being waited upon");
+          LOG.debug("Previously orphan task " + path + " is now being waited upon");
           return null;
         }
         while (oldtask.status == FAILURE) {
-          LOG.debug("wait for status of task " + path +
-              " to change to DELETED");
-          tot_mgr_wait_for_zk_delete.incrementAndGet();
+          LOG.debug("wait for status of task " + path + " to change to DELETED");
+          SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
           try {
             oldtask.wait();
           } catch (InterruptedException e) {
@@ -701,8 +695,7 @@ public class SplitLogManager extends Zoo
         assert false : "Deleted task still present in tasks map";
         return t;
       }
-      LOG.warn("Failure because two threads can't wait for the same task. " +
-          " path=" + path);
+      LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
       return oldtask;
     }
   }
@@ -713,7 +706,7 @@ public class SplitLogManager extends Zoo
     task = tasks.putIfAbsent(path, orphanTask);
     if (task == null) {
       LOG.info("creating orphan task " + path);
-      tot_mgr_orphan_task_acquired.incrementAndGet();
+      SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
       task = orphanTask;
     }
     return task;
@@ -781,8 +774,7 @@ public class SplitLogManager extends Zoo
 
     @Override
     public String toString() {
-      return ("installed = " + installed + " done = " + done + " error = "
-          + error);
+      return ("installed = " + installed + " done = " + done + " error = " + error);
     }
   }
 
@@ -792,7 +784,7 @@ public class SplitLogManager extends Zoo
   static class Task {
     volatile long last_update;
     volatile int last_version;
-    volatile String cur_worker_name;
+    volatile ServerName cur_worker_name;
     TaskBatch batch;
     volatile TerminationStatus status;
     volatile int incarnation;
@@ -829,7 +821,7 @@ public class SplitLogManager extends Zoo
       last_update = time;
     }
 
-    public void heartbeat(long time, int version, String worker) {
+    public void heartbeat(long time, int version, ServerName worker) {
       last_version = version;
       last_update = time;
       cur_worker_name = worker;
@@ -841,12 +833,12 @@ public class SplitLogManager extends Zoo
     }
   }
 
-  void handleDeadWorker(String workerName) {
+  void handleDeadWorker(ServerName workerName) {
     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
     // to reason about concurrency. Makes it easier to retry.
     synchronized (deadWorkersLock) {
       if (deadWorkers == null) {
-        deadWorkers = new HashSet<String>(100);
+        deadWorkers = new HashSet<ServerName>(100);
       }
       deadWorkers.add(workerName);
     }
@@ -854,13 +846,13 @@ public class SplitLogManager extends Zoo
   }
 
   void handleDeadWorkers(List<ServerName> serverNames) {
-    List<String> workerNames = new ArrayList<String>(serverNames.size());
+    List<ServerName> workerNames = new ArrayList<ServerName>(serverNames.size());
     for (ServerName serverName : serverNames) {
-      workerNames.add(serverName.toString());
+      workerNames.add(serverName);
     }
     synchronized (deadWorkersLock) {
       if (deadWorkers == null) {
-        deadWorkers = new HashSet<String>(100);
+        deadWorkers = new HashSet<ServerName>(100);
       }
       deadWorkers.addAll(workerNames);
     }
@@ -882,7 +874,7 @@ public class SplitLogManager extends Zoo
       int unassigned = 0;
       int tot = 0;
       boolean found_assigned_task = false;
-      Set<String> localDeadWorkers;
+      Set<ServerName> localDeadWorkers;
 
       synchronized (deadWorkersLock) {
         localDeadWorkers = deadWorkers;
@@ -892,7 +884,7 @@ public class SplitLogManager extends Zoo
       for (Map.Entry<String, Task> e : tasks.entrySet()) {
         String path = e.getKey();
         Task task = e.getValue();
-        String cur_worker = task.cur_worker_name;
+        ServerName cur_worker = task.cur_worker_name;
         tot++;
         // don't easily resubmit a task which hasn't been picked up yet. It
         // might be a long while before a SplitLogWorker is free to pick up a
@@ -905,7 +897,7 @@ public class SplitLogManager extends Zoo
         }
         found_assigned_task = true;
         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
-          tot_mgr_resubmit_dead_server_task.incrementAndGet();
+          SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
           if (resubmit(path, task, FORCE)) {
             resubmitted++;
           } else {
@@ -948,7 +940,7 @@ public class SplitLogManager extends Zoo
           }
         }
         createRescanNode(Long.MAX_VALUE);
-        tot_mgr_resubmit_unassigned.incrementAndGet();
+        SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
         LOG.debug("resubmitting unassigned task(s) after timeout");
       }
     }
@@ -963,7 +955,7 @@ public class SplitLogManager extends Zoo
 
     @Override
     public void processResult(int rc, String path, Object ctx, String name) {
-      tot_mgr_node_create_result.incrementAndGet();
+      SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
       if (rc != 0) {
         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
           // What if there is a delete pending against this pre-existing
@@ -973,16 +965,16 @@ public class SplitLogManager extends Zoo
           // And all code pieces correctly handle the case of suddenly
           // disappearing task-znode.
           LOG.debug("found pre-existing znode " + path);
-          tot_mgr_node_already_exists.incrementAndGet();
+          SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
         } else {
           Long retry_count = (Long)ctx;
           LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
               path + " remaining retries=" + retry_count);
           if (retry_count == 0) {
-            tot_mgr_node_create_err.incrementAndGet();
+            SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
             createNodeFailure(path);
           } else {
-            tot_mgr_node_create_retry.incrementAndGet();
+            SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
             createNode(path, retry_count - 1);
           }
           return;
@@ -1002,19 +994,23 @@ public class SplitLogManager extends Zoo
     @Override
     public void processResult(int rc, String path, Object ctx, byte[] data,
         Stat stat) {
-      tot_mgr_get_data_result.incrementAndGet();
+      SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
       if (rc != 0) {
         if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) {
           LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries.");
           return;
         }
         if (rc == KeeperException.Code.NONODE.intValue()) {
-          tot_mgr_get_data_nonode.incrementAndGet();
+          SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
           // The task znode has been deleted. Must be some pending delete
           // that deleted the task. Assume success because a task-znode is
           // is only deleted after TaskFinisher is successful.
           LOG.warn("task znode " + path + " vanished.");
-          getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+          try {
+            getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+          } catch (DeserializationException e) {
+            LOG.warn("Deserialization problem", e);
+          }
           return;
         }
         Long retry_count = (Long) ctx;
@@ -1027,15 +1023,19 @@ public class SplitLogManager extends Zoo
         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
             path + " remaining retries=" + retry_count);
         if (retry_count == 0) {
-          tot_mgr_get_data_err.incrementAndGet();
+          SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
           getDataSetWatchFailure(path);
         } else {
-          tot_mgr_get_data_retry.incrementAndGet();
+          SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
           getDataSetWatch(path, retry_count - 1);
         }
         return;
       }
-      getDataSetWatchSuccess(path, data, stat.getVersion());
+      try {
+        getDataSetWatchSuccess(path, data, stat.getVersion());
+      } catch (DeserializationException e) {
+        LOG.warn("Deserialization problem", e);
+      }
       return;
     }
   }
@@ -1049,10 +1049,10 @@ public class SplitLogManager extends Zoo
 
     @Override
     public void processResult(int rc, String path, Object ctx) {
-      tot_mgr_node_delete_result.incrementAndGet();
+      SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
       if (rc != 0) {
         if (rc != KeeperException.Code.NONODE.intValue()) {
-          tot_mgr_node_delete_err.incrementAndGet();
+          SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
           Long retry_count = (Long) ctx;
           LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
               path + " remaining retries=" + retry_count);
@@ -1137,12 +1137,14 @@ public class SplitLogManager extends Zoo
      * @param taskname
      * @return DONE if task completed successfully, ERR otherwise
      */
-    public Status finish(String workerName, String taskname);
+    public Status finish(ServerName workerName, String taskname);
   }
+
   enum ResubmitDirective {
     CHECK(),
     FORCE();
   }
+
   enum TerminationStatus {
     IN_PROGRESS("in_progress"),
     SUCCESS("success"),

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1333099&r1=1333098&r2=1333099&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed May 
2 16:26:36 2012
@@ -37,6 +37,7 @@ import java.util.NavigableSet;
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DeserializationException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -142,6 +143,7 @@ public final class ProtobufUtil {
    * for preamble.
    */
   static final byte [] PB_MAGIC = new byte [] {'P', 'B', 'U', 'F'};
+  private static final String PB_MAGIC_STR = Bytes.toString(PB_MAGIC);
 
   /**
    * Prepend the passed bytes with four bytes of magic, {@link #PB_MAGIC}, to flag what
@@ -164,6 +166,16 @@ public final class ProtobufUtil {
   }
 
   /**
+   * @param bytes
+   * @throws DeserializationException if we are missing the pb magic prefix
+   */
+  public static void expectPBMagicPrefix(final byte [] bytes) throws DeserializationException
{
+    if (!isPBMagicPrefix(bytes)) {
+      throw new DeserializationException("Missing pb magic " + PB_MAGIC_STR + " prefix");
+    }
+  }
+
+  /**
    * @return Length of {@link #PB_MAGIC}
    */
   public static int lengthOfPBMagic() {
@@ -237,6 +249,7 @@ public final class ProtobufUtil {
    *
    * @param serverName the ServerName to convert
    * @return the converted protocol buffer ServerName
+   * @see #toServerName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName)
    */
   public static HBaseProtos.ServerName
       toServerName(final ServerName serverName) {
@@ -259,8 +272,7 @@ public final class ProtobufUtil {
    * @param proto the protocol buffer ServerName to convert
    * @return the converted ServerName
    */
-  public static ServerName toServerName(
-      final HBaseProtos.ServerName proto) {
+  public static ServerName toServerName(final HBaseProtos.ServerName proto) {
     if (proto == null) return null;
     String hostName = proto.getHostName();
     long startCode = -1;
@@ -280,8 +292,7 @@ public final class ProtobufUtil {
    * @param proto the RegionInfo to convert
    * @return the converted HRegionInfo
    */
-  public static HRegionInfo
-      toRegionInfo(final RegionInfo proto) {
+  public static HRegionInfo toRegionInfo(final RegionInfo proto) {
     if (proto == null) return null;
     byte[] tableName = proto.getTableName().toByteArray();
     if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
@@ -316,8 +327,7 @@ public final class ProtobufUtil {
    * @param info the HRegionInfo to convert
    * @return the converted RegionInfo
    */
-  public static RegionInfo
-      toRegionInfo(final HRegionInfo info) {
+  public static RegionInfo toRegionInfo(final HRegionInfo info) {
     if (info == null) return null;
     RegionInfo.Builder builder = RegionInfo.newBuilder();
     builder.setTableName(ByteString.copyFrom(info.getTableName()));



Mime
View raw message