hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1098933 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/monitoring/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/t...
Date Tue, 03 May 2011 06:11:07 GMT
Author: todd
Date: Tue May  3 06:11:06 2011
New Revision: 1098933

URL: http://svn.apache.org/viewvc?rev=1098933&view=rev
Log:
HBASE-3836  Add facility to track currently progressing actions and workflows.

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue May  3 06:11:06 2011
@@ -233,6 +233,8 @@ Release 0.91.0 - Unreleased
                (Subbu M. Iyer via Stack)
    HBASE-1364  [performance] Distributed splitting of regionserver commit logs
                (Prakash Khemani)
+   HBASE-3836  Add facility to track currently progressing actions and
+               workflows. (todd)
 
 Release 0.90.3 - Unreleased
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java Tue May  3 06:11:06 2011
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
@@ -119,17 +120,20 @@ class ActiveMasterManager extends ZooKee
    *
    * This also makes sure that we are watching the master znode so will be
    * notified if another master dies.
+   * @param startupStatus 
    * @return True if no issue becoming active master else false if another
    * master was running or if some other problem (zookeeper, stop flag has been
    * set on this Master)
    */
-  boolean blockUntilBecomingActiveMaster() {
+  boolean blockUntilBecomingActiveMaster(MonitoredTask startupStatus) {
+    startupStatus.setStatus("Trying to register in ZK as active master");
     boolean cleanSetOfActiveMaster = true;
     // Try to become the active master, watch if there is another master
     try {
       if (ZKUtil.createEphemeralNodeAndWatch(this.watcher,
           this.watcher.masterAddressZNode, Bytes.toBytes(this.sn.toString()))) {
         // We are the master, return
+        startupStatus.setStatus("Successfully registered as active master.");
         this.clusterHasActiveMaster.set(true);
         LOG.info("Master=" + this.sn);
         return cleanSetOfActiveMaster;
@@ -143,13 +147,17 @@ class ActiveMasterManager extends ZooKee
         ZKUtil.getDataAndWatch(this.watcher, this.watcher.masterAddressZNode);
       ServerName currentMaster = new ServerName(Bytes.toString(bytes));
       if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) {
-        LOG.info("Current master has this master's address, " + currentMaster +
+        String msg = ("Current master has this master's address, " + currentMaster +
           "; master was restarted?  Waiting on znode to expire...");
+        LOG.info(msg);
+        startupStatus.setStatus(msg);
         // Hurry along the expiration of the znode.
         ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode);
       } else {
-        LOG.info("Another master is the active master, " + currentMaster +
-          "; waiting to become the next active master");
+        String msg = "Another master is the active master, " + currentMaster +
+        "; waiting to become the next active master";
+        LOG.info(msg);
+        startupStatus.setStatus(msg);
       }
     } catch (KeeperException ke) {
       master.abort("Received an unexpected KeeperException, aborting", ke);
@@ -168,7 +176,7 @@ class ActiveMasterManager extends ZooKee
         return cleanSetOfActiveMaster;
       }
       // Try to become active master again now that there is no active master
-      blockUntilBecomingActiveMaster();
+      blockUntilBecomingActiveMaster(startupStatus);
     }
     return cleanSetOfActiveMaster;
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue May  3 06:11:06 2011
@@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.master.ha
 import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
 import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
 import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.security.User;
@@ -271,6 +273,9 @@ implements HMasterInterface, HMasterRegi
    */
   @Override
   public void run() {
+    MonitoredTask startupStatus =
+      TaskMonitor.get().createStatus("Master startup");
+    startupStatus.setDescription("Master startup");
     try {
       /*
        * Block on becoming the active master.
@@ -282,16 +287,18 @@ implements HMasterInterface, HMasterRegi
        * now wait until it dies to try and become the next active master.  If we
        * do not succeed on our first attempt, this is no longer a cluster startup.
        */
-      becomeActiveMaster();
+      becomeActiveMaster(startupStatus);
 
       // We are either the active master or we were asked to shutdown
       if (!this.stopped) {
-        finishInitialization();
+        finishInitialization(startupStatus);
         loop();
       }
     } catch (Throwable t) {
       abort("Unhandled exception. Starting shutdown.", t);
     } finally {
+      startupStatus.cleanup();
+      
       stopChores();
       // Wait for all the remaining region servers to report in IFF we were
       // running a cluster shutdown AND we were NOT aborting.
@@ -313,17 +320,19 @@ implements HMasterInterface, HMasterRegi
 
   /**
    * Try becoming active master.
+   * @param startupStatus 
    * @return True if we could successfully become the active master.
    * @throws InterruptedException
    */
-  private boolean becomeActiveMaster() throws InterruptedException {
+  private boolean becomeActiveMaster(MonitoredTask startupStatus)
+  throws InterruptedException {
     // TODO: This is wrong!!!! Should have new servername if we restart ourselves,
     // if we come back to life.
     this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
         this);
     this.zooKeeper.registerListener(activeMasterManager);
     stallIfBackupMaster(this.conf, this.activeMasterManager);
-    return this.activeMasterManager.blockUntilBecomingActiveMaster();
+    return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus);
   }
 
   /**
@@ -386,7 +395,7 @@ implements HMasterInterface, HMasterRegi
    * @throws InterruptedException
    * @throws KeeperException
    */
-  private void finishInitialization()
+  private void finishInitialization(MonitoredTask status)
   throws IOException, InterruptedException, KeeperException {
 
     isActiveMaster = true;
@@ -397,9 +406,12 @@ implements HMasterInterface, HMasterRegi
      * below after we determine if cluster startup or failover.
      */
 
+    status.setStatus("Initializing Master file system");
     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
     this.fileSystemManager = new MasterFileSystem(this, metrics);
+
     // publish cluster ID
+    status.setStatus("Publishing Cluster ID in ZooKeeper");
     ClusterId.setClusterId(this.zooKeeper,
         fileSystemManager.getClusterId());
 
@@ -407,16 +419,19 @@ implements HMasterInterface, HMasterRegi
 
     this.serverManager = new ServerManager(this, this);
 
+    status.setStatus("Initializing ZK system trackers");
     initializeZKBasedSystemTrackers();
 
     // initialize master side coprocessors before we start handling requests
+    status.setStatus("Initializing master coprocessors");
     this.cpHost = new MasterCoprocessorHost(this, this.conf);
 
     // start up all service threads.
+    status.setStatus("Initializing master service threads");
     startServiceThreads();
 
     // Wait for region servers to report in.
-    this.serverManager.waitForRegionServers();
+    this.serverManager.waitForRegionServers(status);
     // Check zk for regionservers that are up but didn't register
     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
       if (!this.serverManager.isServerOnline(sn)) {
@@ -427,20 +442,25 @@ implements HMasterInterface, HMasterRegi
     }
 
     // TODO: Should do this in background rather than block master startup
+    status.setStatus("Splitting logs after master startup");
     this.fileSystemManager.
       splitLogAfterStartup(this.serverManager.getOnlineServers().keySet());
 
     // Make sure root and meta assigned before proceeding.
-    assignRootAndMeta();
+    assignRootAndMeta(status);
+    
     // Fixup assignment manager status
+    status.setStatus("Starting assignment manager");
     this.assignmentManager.joinCluster();
 
     // Start balancer and meta catalog janitor after meta and regions have
     // been assigned.
+    status.setStatus("Starting balancer and catalog janitor");
     this.balancerChore = getAndStartBalancerChore(this);
     this.catalogJanitorChore =
       Threads.setDaemonThreadRunning(new CatalogJanitor(this, this));
 
+    status.markComplete("Initialization successful");
     LOG.info("Master has completed initialization");
     initialized = true;
   }
@@ -453,12 +473,13 @@ implements HMasterInterface, HMasterRegi
    * @throws KeeperException
    * @return Count of regions we assigned.
    */
-  int assignRootAndMeta()
+  int assignRootAndMeta(MonitoredTask status)
   throws InterruptedException, IOException, KeeperException {
     int assigned = 0;
     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
 
     // Work on ROOT region.  Is it in zk in transition?
+    status.setStatus("Assigning ROOT region");
     boolean rit = this.assignmentManager.
       processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO);
     if (!catalogTracker.verifyRootRegionLocation(timeout)) {
@@ -474,6 +495,7 @@ implements HMasterInterface, HMasterRegi
       ", location=" + catalogTracker.getRootLocation());
 
     // Work on meta region
+    status.setStatus("Assigning META region");
     rit = this.assignmentManager.
       processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
     if (!this.catalogTracker.verifyMetaRegionLocation(timeout)) {
@@ -490,6 +512,7 @@ implements HMasterInterface, HMasterRegi
     }
     LOG.info(".META. assigned=" + assigned + ", rit=" + rit +
       ", location=" + catalogTracker.getMetaLocation());
+    status.setStatus("META and ROOT assigned.");
     return assigned;
   }
 
@@ -1101,15 +1124,21 @@ implements HMasterInterface, HMasterRegi
     this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":"
         + this.serverName.getPort(), this);
 
-    if (!becomeActiveMaster()) {
-      return false;
+    MonitoredTask status = 
+      TaskMonitor.get().createStatus("Recovering expired ZK session");
+    try {
+      if (!becomeActiveMaster(status)) {
+        return false;
+      }
+      initializeZKBasedSystemTrackers();
+      // Update in-memory structures to reflect our earlier Root/Meta assignment.
+      assignRootAndMeta(status);
+      // process RIT if any
+      this.assignmentManager.processRegionsInTransition();
+      return true;
+    } finally {
+      status.cleanup();
     }
-    initializeZKBasedSystemTrackers();
-    // Update in-memory structures to reflect our earlier Root/Meta assignment.
-    assignRootAndMeta();
-    // process RIT if any
-    this.assignmentManager.processRegionsInTransition();
-    return true;
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue May  3 06:11:06 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
 import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 
 /**
  * The ServerManager class manages info about region servers.
@@ -466,7 +467,7 @@ public class ServerManager {
    * Waits for the regionservers to report in.
    * @throws InterruptedException
    */
-  public void waitForRegionServers()
+  public void waitForRegionServers(MonitoredTask status)
   throws InterruptedException {
     long interval = this.master.getConfiguration().
       getLong("hbase.master.wait.on.regionservers.interval", 3000);
@@ -477,11 +478,15 @@ public class ServerManager {
       Thread.sleep(interval);
       count = countOfRegionServers();
       if (count == oldcount && count > 0) break;
+
+      String msg;
       if (count == 0) {
-        LOG.info("Waiting on regionserver(s) to checkin");
+        msg = "Waiting on regionserver(s) to checkin";
       } else {
-        LOG.info("Waiting on regionserver(s) count to settle; currently=" + count);
+        msg = "Waiting on regionserver(s) count to settle; currently=" + count;
       }
+      LOG.info(msg);
+      status.setStatus(msg);
       oldcount = count;
     }
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Tue May  3 06:11:06 2011
@@ -35,6 +35,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
@@ -183,15 +185,22 @@ public class SplitLogManager extends Zoo
       LOG.warn(logDir + " doesn't exist. Nothing to do!");
       return 0;
     }
+    
+    MonitoredTask status = TaskMonitor.get().createStatus(
+          "Doing distributed log split in " + logDir);
+
+    status.setStatus("Checking directory contents...");
     FileStatus[] logfiles = fs.listStatus(logDir); // TODO filter filenames?
     if (logfiles == null || logfiles.length == 0) {
       LOG.info(logDir + " is empty dir, no logs to split");
       return 0;
     }
+    
+    status.setStatus("Scheduling batch of logs to split");
     tot_mgr_log_split_batch_start.incrementAndGet();
     LOG.info("started splitting logs in " + logDir);
     long t = EnvironmentEdgeManager.currentTimeMillis();
-    long totalSize = 0;
+    long totalSize = 0;    
     TaskBatch batch = new TaskBatch();
     for (FileStatus lf : logfiles) {
       // TODO If the log file is still being written to - which is most likely
@@ -205,7 +214,7 @@ public class SplitLogManager extends Zoo
             + lf.getPath());
       }
     }
-    waitTasks(batch);
+    waitTasks(batch, status);
     if (batch.done != batch.installed) {
       stopTrackingTasks(batch);
       tot_mgr_log_split_batch_err.incrementAndGet();
@@ -214,6 +223,8 @@ public class SplitLogManager extends Zoo
       throw new IOException("error or interrupt while splitting logs in "
           + logDir + " Task = " + batch);
     }
+    
+    status.setStatus("Checking for orphaned logs in log directory...");
     if (anyNewLogFiles(logDir, logfiles)) {
       tot_mgr_new_unexpected_hlogs.incrementAndGet();
       LOG.warn("new hlogs were produced while logs in " + logDir +
@@ -221,12 +232,18 @@ public class SplitLogManager extends Zoo
       throw new OrphanHLogAfterSplitException();
     }
     tot_mgr_log_split_batch_success.incrementAndGet();
+    
+    status.setStatus("Cleaning up log directory...");
     if (!fs.delete(logDir, true)) {
       throw new IOException("Unable to delete src dir: " + logDir);
     }
-    LOG.info("finished splitting (more than or equal to) " + totalSize +
+
+    String msg = "finished splitting (more than or equal to) " + totalSize +
         " bytes in " + batch.installed + " log files in " + logDir + " in " +
-        (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms");
+        (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
+    status.markComplete(msg);
+    LOG.info(msg);
+    
     return totalSize;
   }
 
@@ -244,10 +261,14 @@ public class SplitLogManager extends Zoo
     return false;
   }
 
-  private void waitTasks(TaskBatch batch) {
+  private void waitTasks(TaskBatch batch, MonitoredTask status) {
     synchronized (batch) {
       while ((batch.done + batch.error) != batch.installed) {
         try {
+          status.setStatus("Waiting for distributed tasks to finish. "
+              + " scheduled=" + batch.installed
+              + " done=" + batch.done
+              + " error=" + batch.error);
           batch.wait(100);
           if (stopper.isStopped()) {
             LOG.warn("Stopped while waiting for log splits to be completed");

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java?rev=1098933&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java Tue May  3 06:11:06 2011
@@ -0,0 +1,53 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+public interface MonitoredTask {
+  enum State {
+    RUNNING,
+    COMPLETE,
+    ABORTED;
+  }
+
+  public abstract long getStartTime();
+
+  public abstract String getDescription();
+
+  public abstract String getStatus();
+
+  public abstract State getState();
+
+  public abstract long getCompletionTimestamp();
+
+  public abstract void markComplete(String msg);
+  public abstract void abort(String msg);
+
+  public abstract void setStatus(String status);
+
+  public abstract void setDescription(String description);
+
+  /**
+   * Explicitly mark this status as able to be cleaned up,
+   * even though it might not be complete.
+   */
+  public abstract void cleanup();
+
+
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java?rev=1098933&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java Tue May  3 06:11:06 2011
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import com.google.common.annotations.VisibleForTesting;
+
+class MonitoredTaskImpl implements MonitoredTask {
+  private long startTime;
+  private long completionTimestamp = -1;
+  
+  private String status;
+  private String description;
+  
+  private State state = State.RUNNING;
+  
+  public MonitoredTaskImpl() {
+    startTime = System.currentTimeMillis();
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+  
+  @Override
+  public String getDescription() {
+    return description;
+  }
+
+  @Override
+  public String getStatus() {
+    return status;
+  }
+  
+  @Override
+  public State getState() {
+    return state;
+  }
+  
+  @Override
+  public long getCompletionTimestamp() {
+    return completionTimestamp;
+  }
+  
+  @Override
+  public void markComplete(String status) {
+    state = State.COMPLETE;
+    setStatus(status);
+    completionTimestamp = System.currentTimeMillis();
+  }
+
+  @Override
+  public void abort(String msg) {
+    setStatus(msg);
+    state = State.ABORTED;
+    completionTimestamp = System.currentTimeMillis();
+  }
+  
+  @Override
+  public void setStatus(String status) {
+    this.status = status;
+  }
+
+  @Override
+  public void setDescription(String description) {
+    this.description = description;
+  }
+
+  @Override
+  public void cleanup() {
+    if (state == State.RUNNING) {
+      state = State.ABORTED;
+      completionTimestamp = System.currentTimeMillis();
+    }
+  }
+
+  /**
+   * Force the completion timestamp backwards so that
+   * it expires now.
+   */
+  @VisibleForTesting
+  void expireNow() {
+    completionTimestamp -= 180 * 1000;
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java?rev=1098933&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java Tue May  3 06:11:06 2011
@@ -0,0 +1,176 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import java.lang.ref.WeakReference;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+/**
+ * Singleton which keeps track of tasks going on in this VM.
+ * A Task here is anything which takes more than a few seconds
+ * and the user might want to inquire about the status
+ */
+public class TaskMonitor {
+  private static final Log LOG = LogFactory.getLog(TaskMonitor.class);
+
+  // Don't keep around any tasks that have completed more than
+  // 60 seconds ago
+  private static final long EXPIRATION_TIME = 60*1000;
+
+  @VisibleForTesting
+  static final int MAX_TASKS = 1000;
+  
+  private static TaskMonitor instance;
+  private List<TaskAndWeakRefPair> tasks =
+    Lists.newArrayList();
+
+  /**
+   * Get singleton instance.
+   * TODO this would be better off scoped to a single daemon
+   */
+  public static synchronized TaskMonitor get() {
+    if (instance == null) {
+      instance = new TaskMonitor();
+    }
+    return instance;
+  }
+  
+  public MonitoredTask createStatus(String description) {
+    MonitoredTask stat = new MonitoredTaskImpl();
+    stat.setDescription(description);
+    MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(
+        stat.getClass().getClassLoader(),
+        new Class<?>[] { MonitoredTask.class },
+        new PassthroughInvocationHandler<MonitoredTask>(stat));
+
+    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
+    tasks.add(pair);
+    return proxy;
+  }
+  
+  private synchronized void purgeExpiredTasks() {
+    int size = 0;
+    
+    for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
+         it.hasNext();) {
+      TaskAndWeakRefPair pair = it.next();
+      MonitoredTask stat = pair.get();
+      
+      if (pair.isDead()) {
+        // The class who constructed this leaked it. So we can
+        // assume it's done.
+        if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
+          LOG.warn("Status " + stat + " appears to have been leaked");
+          stat.cleanup();
+        }
+      }
+      
+      if (canPurge(stat)) {
+        it.remove();
+      } else {
+        size++;
+      }
+    }
+    
+    if (size > MAX_TASKS) {
+      LOG.warn("Too many actions in action monitor! Purging some.");
+      tasks = tasks.subList(size - MAX_TASKS, size);
+    }
+  }
+
+  public synchronized List<MonitoredTask> getTasks() {
+    purgeExpiredTasks();
+    ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
+    for (TaskAndWeakRefPair pair : tasks) {
+      ret.add(pair.get());
+    }
+    return ret;
+  }
+
+  private boolean canPurge(MonitoredTask stat) {
+    long cts = stat.getCompletionTimestamp();
+    return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME);
+  }
+  
+  /**
+   * This class encapsulates an object as well as a weak reference to a proxy
+   * that passes through calls to that object. In art form:
+   * <code>
+   *     Proxy  <------------------
+   *       |                       \
+   *       v                        \
+   * PassthroughInvocationHandler   |  weak reference
+   *       |                       /
+   * MonitoredTaskImpl            / 
+   *       |                     /
+   * StatAndWeakRefProxy  ------/
+   *
+   * Since we only return the Proxy to the creator of the MonitorableStatus,
+   * this means that they can leak that object, and we'll detect it
+   * since our weak reference will go null. But, we still have the actual
+   * object, so we can log it and display it as a leaked (incomplete) action.
+   */
+  private static class TaskAndWeakRefPair {
+    private MonitoredTask impl;
+    private WeakReference<MonitoredTask> weakProxy;
+    
+    public TaskAndWeakRefPair(MonitoredTask stat,
+        MonitoredTask proxy) {
+      this.impl = stat;
+      this.weakProxy = new WeakReference<MonitoredTask>(proxy);
+    }
+    
+    public MonitoredTask get() {
+      return impl;
+    }
+    
+    public boolean isDead() {
+      return weakProxy.get() == null;
+    }
+  }
+  
+  /**
+   * An InvocationHandler that simply passes through calls to the original object.
+   */
+  private static class PassthroughInvocationHandler<T> implements InvocationHandler {
+    private T delegatee;
+    
+    public PassthroughInvocationHandler(T delegatee) {
+      this.delegatee = delegatee;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+      return method.invoke(delegatee, args);
+    }    
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue May  3 06:11:06 2011
@@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.io.TimeRa
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -349,7 +351,12 @@ public class HRegion implements HeapSize
    */
   public long initialize(final CancelableProgressable reporter)
   throws IOException {
+  
+    MonitoredTask status = TaskMonitor.get().createStatus(
+        "Initializing region " + this);
+    
     if (coprocessorHost != null) {
+      status.setStatus("Running coprocessor pre-open hook");
       coprocessorHost.preOpen();
     }
     // A region can be reopened if failed a split; reset flags
@@ -357,14 +364,17 @@ public class HRegion implements HeapSize
     this.closed.set(false);
 
     // Write HRI to a file in case we need to recover .META.
+    status.setStatus("Writing region info on filesystem");
     checkRegioninfoOnFilesystem();
 
     // Remove temporary data left over from old regions
+    status.setStatus("Cleaning up temporary data from old regions");
     cleanupTmpDir();
 
     // Load in all the HStores.  Get maximum seqid.
     long maxSeqId = -1;
     for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
+      status.setStatus("Instantiating store for column family " + c);
       Store store = instantiateHStore(this.tableDir, c);
       this.stores.put(c.getName(), store);
       long storeSeqId = store.getMaxSequenceId();
@@ -373,8 +383,10 @@ public class HRegion implements HeapSize
       }
     }
     // Recover any edits if available.
-    maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
+    maxSeqId = replayRecoveredEditsIfAny(
+        this.regiondir, maxSeqId, reporter, status);
 
+    status.setStatus("Cleaning up detritus from prior splits");
     // Get rid of any splits or merges that were lost in-progress.  Clean out
     // these directories here on open.  We may be opening a region that was
     // being split but we crashed in the middle of it all.
@@ -390,9 +402,12 @@ public class HRegion implements HeapSize
     long nextSeqid = maxSeqId + 1;
     LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
 
+    
     if (coprocessorHost != null) {
+      status.setStatus("Running coprocessor post-open hooks");
       coprocessorHost.postOpen();
     }
+    status.markComplete("Region opened successfully");
     return nextSeqid;
   }
 
@@ -556,12 +571,22 @@ public class HRegion implements HeapSize
   public List<StoreFile> close(final boolean abort) throws IOException {
     // Only allow one thread to close at a time. Serialize them so dual
     // threads attempting to close will run up against each other.
-    synchronized (closeLock) {
-      return doClose(abort);
+    MonitoredTask status = TaskMonitor.get().createStatus(
+        "Closing region " + this +
+        (abort ? " due to abort" : ""));
+    
+    status.setStatus("Waiting for close lock");
+    try {
+      synchronized (closeLock) {
+        return doClose(abort, status);
+      }
+    } finally {
+      status.cleanup();
     }
   }
 
-  private List<StoreFile> doClose(final boolean abort)
+  private List<StoreFile> doClose(
+      final boolean abort, MonitoredTask status)
   throws IOException {
     if (isClosed()) {
       LOG.warn("Region " + this + " already closed");
@@ -569,9 +594,11 @@ public class HRegion implements HeapSize
     }
 
     if (coprocessorHost != null) {
+      status.setStatus("Running coprocessor pre-close hooks");
       this.coprocessorHost.preClose(abort);
     }
 
+    status.setStatus("Disabling compacts and flushes for region");
     boolean wasFlushing = false;
     synchronized (writestate) {
       // Disable compacting and flushing by background threads for this
@@ -596,20 +623,24 @@ public class HRegion implements HeapSize
     // that will clear out of the bulk of the memstore before we put up
     // the close flag?
     if (!abort && !wasFlushing && worthPreFlushing()) {
+      status.setStatus("Pre-flushing region before close");
       LOG.info("Running close preflush of " + this.getRegionNameAsString());
-      internalFlushcache();
+      internalFlushcache(status);
     }
+
     this.closing.set(true);
+    status.setStatus("Disabling writes for close");
     lock.writeLock().lock();
     try {
       if (this.isClosed()) {
+        status.abort("Already got closed by another process");
         // SplitTransaction handles the null
         return null;
       }
       LOG.debug("Updates disabled for region " + this);
       // Don't flush the cache if we are aborting
       if (!abort) {
-        internalFlushcache();
+        internalFlushcache(status);
       }
 
       List<StoreFile> result = new ArrayList<StoreFile>();
@@ -619,8 +650,10 @@ public class HRegion implements HeapSize
       this.closed.set(true);
 
       if (coprocessorHost != null) {
+        status.setStatus("Running coprocessor post-close hooks");
         this.coprocessorHost.postClose(abort);
       }
+      status.markComplete("Closed");
       LOG.info("Closed " + this);
       return result;
     } finally {
@@ -824,6 +857,8 @@ public class HRegion implements HeapSize
     lock.readLock().lock();
     this.lastCompactInfo = null;
     byte [] splitRow = null;
+    MonitoredTask status = TaskMonitor.get().createStatus(
+        "Compacting stores in " + this);
     try {
       if (this.closed.get()) {
         LOG.debug("Skipping compaction on " + this + " because closed");
@@ -833,6 +868,7 @@ public class HRegion implements HeapSize
         return splitRow;
       }
       if (coprocessorHost != null) {
+        status.setStatus("Running coprocessor preCompact hooks");
         coprocessorHost.preCompact(false);
       }
       try {
@@ -840,10 +876,12 @@ public class HRegion implements HeapSize
           if (!writestate.compacting && writestate.writesEnabled) {
             writestate.compacting = true;
           } else {
-            LOG.info("NOT compacting region " + this +
+            String msg = "NOT compacting region " + this +
                 ": compacting=" + writestate.compacting + ", writesEnabled=" +
-                writestate.writesEnabled);
-              return splitRow;
+                writestate.writesEnabled;
+            LOG.info(msg);
+            status.abort(msg);
+            return splitRow;
           }
         }
         LOG.info("Starting compaction on region " + this);
@@ -852,6 +890,7 @@ public class HRegion implements HeapSize
         long lastCompactSize = 0;
         boolean completed = false;
         try {
+          status.setStatus("Compacting store " + store);
           final Store.StoreSize ss = store.compact();
           lastCompactSize += store.getLastCompactSize();
           if (ss != null) {
@@ -868,6 +907,9 @@ public class HRegion implements HeapSize
           if (completed) {
             this.lastCompactInfo =
               new Pair<Long,Long>((now - startTime) / 1000, lastCompactSize);
+            status.setStatus("Compaction complete: " +
+                StringUtils.humanReadableInt(lastCompactSize) + " in " +
+                (now - startTime) + "ms");
           }
         }
       } finally {
@@ -877,9 +919,13 @@ public class HRegion implements HeapSize
         }
       }
       if (coprocessorHost != null) {
+        status.setStatus("Running coprocessor post-compact hooks");
         coprocessorHost.postCompact(splitRow != null);
       }
+      
+      status.markComplete("Compaction complete");
     } finally {
+      status.cleanup();
       lock.readLock().unlock();
     }
     if (splitRow != null) {
@@ -915,13 +961,17 @@ public class HRegion implements HeapSize
       LOG.debug("Skipping flush on " + this + " because closing");
       return false;
     }
+    MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
+    status.setStatus("Acquiring readlock on region");
     lock.readLock().lock();
     try {
       if (this.closed.get()) {
         LOG.debug("Skipping flush on " + this + " because closed");
+        status.abort("Skipped: closed");
         return false;
       }
       if (coprocessorHost != null) {
+        status.setStatus("Running coprocessor pre-flush hooks");
         coprocessorHost.preFlush();
       }
       try {
@@ -935,13 +985,19 @@ public class HRegion implements HeapSize
                   writestate.flushing + ", writesEnabled=" +
                   writestate.writesEnabled);
             }
+            status.abort("Not flushing since " +
+                (writestate.flushing ? "already flushing" : "writes not enabled"));
             return false;
           }
         }
-        boolean result = internalFlushcache();
+        boolean result = internalFlushcache(status);
+        
         if (coprocessorHost != null) {
+          status.setStatus("Running post-flush coprocessor hooks");
           coprocessorHost.postFlush();
         }
+
+        status.markComplete("Flush successful");
         return result;
       } finally {
         synchronized (writestate) {
@@ -952,6 +1008,7 @@ public class HRegion implements HeapSize
       }
     } finally {
       lock.readLock().unlock();
+      status.cleanup();
     }
   }
 
@@ -982,6 +1039,7 @@ public class HRegion implements HeapSize
    * routes.
    *
    * <p> This method may block for some time.
+   * @param status 
    *
    * @return true if the region needs compacting
    *
@@ -989,19 +1047,21 @@ public class HRegion implements HeapSize
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  protected boolean internalFlushcache() throws IOException {
-    return internalFlushcache(this.log, -1);
+  protected boolean internalFlushcache(MonitoredTask status) throws IOException {
+    return internalFlushcache(this.log, -1, status);
   }
 
   /**
    * @param wal Null if we're NOT to go via hlog/wal.
    * @param myseqid The seqid to use if <code>wal</code> is null writing out
    * flush file.
+   * @param status 
    * @return true if the region needs compacting
    * @throws IOException
    * @see #internalFlushcache()
    */
-  protected boolean internalFlushcache(final HLog wal, final long myseqid)
+  protected boolean internalFlushcache(
+      final HLog wal, final long myseqid, MonitoredTask status)
   throws IOException {
     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
     // Clear flush flag.
@@ -1031,7 +1091,9 @@ public class HRegion implements HeapSize
     // We have to take a write lock during snapshot, or else a write could
     // end up in both snapshot and memstore (makes it difficult to do atomic
     // rows then)
+    status.setStatus("Obtaining lock to block concurrent updates");
     this.updatesLock.writeLock().lock();
+    status.setStatus("Preparing to flush by snapshotting stores");
     final long currentMemStoreSize = this.memstoreSize.get();
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
     try {
@@ -1049,6 +1111,7 @@ public class HRegion implements HeapSize
     } finally {
       this.updatesLock.writeLock().unlock();
     }
+    status.setStatus("Flushing stores");
 
     LOG.debug("Finished snapshotting, commencing flushing stores");
 
@@ -1063,7 +1126,7 @@ public class HRegion implements HeapSize
       // just-made new flush store file.
 
       for (StoreFlusher flusher : storeFlushers) {
-        flusher.flushCache();
+        flusher.flushCache(status);
       }
       // Switch snapshot (in memstore) -> new hfile (thus causing
       // all the store scanners to reset/reseek).
@@ -1088,6 +1151,7 @@ public class HRegion implements HeapSize
       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
           Bytes.toStringBinary(getRegionName()));
       dse.initCause(t);
+      status.abort("Flush failed: " + StringUtils.stringifyException(t));
       throw dse;
     }
 
@@ -1111,13 +1175,13 @@ public class HRegion implements HeapSize
     }
 
     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
-    if (LOG.isDebugEnabled()) {
-      LOG.info("Finished memstore flush of ~" +
+    String msg = "Finished memstore flush of ~" +
         StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
         this + " in " + time + "ms, sequenceid=" + sequenceId +
         ", compaction requested=" + compactionRequested +
-        ((wal == null)? "; wal=null": ""));
-    }
+        ((wal == null)? "; wal=null": "");
+    LOG.info(msg);
+    status.setStatus(msg);
     this.recentFlushes.add(new Pair<Long,Long>(time/1000,currentMemStoreSize));
 
     return compactionRequested;
@@ -2020,7 +2084,8 @@ public class HRegion implements HeapSize
    * @throws IOException
    */
   protected long replayRecoveredEditsIfAny(final Path regiondir,
-      final long minSeqId, final CancelableProgressable reporter)
+      final long minSeqId, final CancelableProgressable reporter,
+      final MonitoredTask status)
   throws UnsupportedEncodingException, IOException {
     long seqid = minSeqId;
     NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
@@ -2046,7 +2111,7 @@ public class HRegion implements HeapSize
     }
     if (seqid > minSeqId) {
       // Then we added some edits to memory. Flush and cleanup split edit files.
-      internalFlushcache(null, seqid);
+      internalFlushcache(null, seqid, status);
     }
     // Now delete the content of recovered edits.  We're done w/ them.
     for (Path file: files) {
@@ -2071,7 +2136,11 @@ public class HRegion implements HeapSize
   private long replayRecoveredEdits(final Path edits,
       final long minSeqId, final CancelableProgressable reporter)
     throws IOException {
-    LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId);
+    String msg = "Replaying edits from " + edits + "; minSequenceid=" + minSeqId;
+    LOG.info(msg);
+    MonitoredTask status = TaskMonitor.get().createStatus(msg);
+    
+    status.setStatus("Opening logs");
     HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
     try {
     long currentEditSeqId = minSeqId;
@@ -2103,10 +2172,14 @@ public class HRegion implements HeapSize
             intervalEdits = 0;
             long cur = EnvironmentEdgeManager.currentTimeMillis();
             if (lastReport + period <= cur) {
+              status.setStatus("Replaying edits..." +
+                  " skipped=" + skippedEdits +
+                  " edits=" + editsCount);
               // Timeout reached
               if(!reporter.progress()) {
-                String msg = "Progressable reporter failed, stopping replay";
+                msg = "Progressable reporter failed, stopping replay";
                 LOG.warn(msg);
+                status.abort(msg);
                 throw new IOException(msg);
               }
               lastReport = cur;
@@ -2117,6 +2190,7 @@ public class HRegion implements HeapSize
         // Start coprocessor replay here. The coprocessor is for each WALEdit
         // instead of a KeyValue.
         if (coprocessorHost != null) {
+          status.setStatus("Running pre-WAL-restore hook in coprocessors");
           if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
             // if bypass this log entry, ignore it ...
             continue;
@@ -2158,7 +2232,7 @@ public class HRegion implements HeapSize
           flush = restoreEdit(store, kv);
           editsCount++;
         }
-        if (flush) internalFlushcache(null, currentEditSeqId);
+        if (flush) internalFlushcache(null, currentEditSeqId, status);
 
         if (coprocessorHost != null) {
           coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
@@ -2166,30 +2240,39 @@ public class HRegion implements HeapSize
       }
     } catch (EOFException eof) {
       Path p = HLog.moveAsideBadEditsFile(fs, edits);
-      LOG.warn("Encountered EOF. Most likely due to Master failure during " +
+      msg = "Encountered EOF. Most likely due to Master failure during " +
           "log spliting, so we have this data in another edit.  " +
-          "Continuing, but renaming " + edits + " as " + p, eof);
+          "Continuing, but renaming " + edits + " as " + p;
+      LOG.warn(msg, eof);
+      status.abort(msg);
     } catch (IOException ioe) {
       // If the IOE resulted from bad file format,
       // then this problem is idempotent and retrying won't help
       if (ioe.getCause() instanceof ParseException) {
         Path p = HLog.moveAsideBadEditsFile(fs, edits);
-        LOG.warn("File corruption encountered!  " +
-            "Continuing, but renaming " + edits + " as " + p, ioe);
+        msg = "File corruption encountered!  " +
+            "Continuing, but renaming " + edits + " as " + p;
+        LOG.warn(msg, ioe);
+        status.setStatus(msg);
       } else {
+        status.abort(StringUtils.stringifyException(ioe));
         // other IO errors may be transient (bad network connection,
         // checksum exception on one datanode, etc).  throw & retry
         throw ioe;
       }
     }
+    
+    msg = "Applied " + editsCount + ", skipped " + skippedEdits +
+    ", firstSequenceidInLog=" + firstSeqIdInLog +
+    ", maxSequenceidInLog=" + currentEditSeqId;
+    status.markComplete(msg);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
-          ", firstSequenceidInLog=" + firstSeqIdInLog +
-          ", maxSequenceidInLog=" + currentEditSeqId);
+      LOG.debug(msg);
     }
     return currentEditSeqId;
     } finally {
       reader.close();
+      status.cleanup();
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue May  3 06:11:06 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -448,11 +449,13 @@ public class Store implements HeapSize {
    */
   private StoreFile flushCache(final long logCacheFlushId,
       SortedSet<KeyValue> snapshot,
-      TimeRangeTracker snapshotTimeRangeTracker) throws IOException {
+      TimeRangeTracker snapshotTimeRangeTracker,
+      MonitoredTask status) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
-    return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker);
+    return internalFlushCache(
+        snapshot, logCacheFlushId, snapshotTimeRangeTracker, status);
   }
 
   /*
@@ -463,7 +466,8 @@ public class Store implements HeapSize {
    */
   private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
       final long logCacheFlushId,
-      TimeRangeTracker snapshotTimeRangeTracker)
+      TimeRangeTracker snapshotTimeRangeTracker,
+      MonitoredTask status)
       throws IOException {
     StoreFile.Writer writer = null;
     long flushed = 0;
@@ -476,6 +480,7 @@ public class Store implements HeapSize {
     // flush to list of store files.  Add cleanup of anything put on filesystem
     // if we fail.
     synchronized (flushLock) {
+      status.setStatus("Flushing " + this + ": creating writer");
       // A. Write the map out to the disk
       writer = createWriterInTmp(set.size());
       writer.setTimeRangeTracker(snapshotTimeRangeTracker);
@@ -491,18 +496,23 @@ public class Store implements HeapSize {
       } finally {
         // Write out the log sequence number that corresponds to this output
         // hfile.  The hfile is current up to and including logCacheFlushId.
+        status.setStatus("Flushing " + this + ": appending metadata");
         writer.appendMetadata(logCacheFlushId, false);
+        status.setStatus("Flushing " + this + ": closing flushed file");
         writer.close();
       }
     }
 
     // Write-out finished successfully, move into the right spot
     Path dstPath = StoreFile.getUniqueFile(fs, homedir);
-    LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
+    String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath;
+    LOG.info(msg);
+    status.setStatus("Flushing " + this + ": " + msg);
     if (!fs.rename(writer.getPath(), dstPath)) {
       LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath);
     }
 
+    status.setStatus("Flushing " + this + ": reopening flushed file");
     StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
       this.conf, this.family.getBloomFilterType(), this.inMemory);
     StoreFile.Reader r = sf.createReader();
@@ -1593,8 +1603,9 @@ public class Store implements HeapSize {
     }
 
     @Override
-    public void flushCache() throws IOException {
-      storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker);
+    public void flushCache(MonitoredTask status) throws IOException {
+      storeFile = Store.this.flushCache(
+          cacheFlushId, snapshot, snapshotTimeRangeTracker, status);
     }
 
     @Override

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Tue May  3 06:11:06 2011
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+
 /**
  * A package protected interface for a store flushing.
  * A store flusher carries the state required to prepare/flush/commit the
@@ -45,7 +47,7 @@ interface StoreFlusher {
    *
    * @throws IOException in case the flush fails
    */
-  void flushCache() throws IOException;
+  void flushCache(MonitoredTask status) throws IOException;
 
   /**
    * Commit the flush - add the store file to the store and clear the

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Tue May  3 06:11:06 2011
@@ -46,6 +46,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -70,7 +72,6 @@ import com.google.common.collect.Lists;
  * region to replay on startup. Delete the old log files when finished.
  */
 public class HLogSplitter {
-
   private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
 
   /**
@@ -106,6 +107,8 @@ public class HLogSplitter {
   // Wait/notify for when data has been produced by the reader thread,
   // consumed by the reader thread, or an exception occurred
   Object dataAvailable = new Object();
+  
+  private MonitoredTask status;
 
 
   /**
@@ -179,10 +182,16 @@ public class HLogSplitter {
         "An HLogSplitter instance may only be used once");
     hasSplit = true;
 
+    status = TaskMonitor.get().createStatus(
+        "Splitting logs in " + srcDir);
+    
     long startTime = EnvironmentEdgeManager.currentTimeMillis();
+    
+    status.setStatus("Determining files to split...");
     List<Path> splits = null;
     if (!fs.exists(srcDir)) {
       // Nothing to do
+      status.markComplete("No log directory existed to split.");
       return splits;
     }
     FileStatus[] logfiles = fs.listStatus(srcDir);
@@ -190,15 +199,20 @@ public class HLogSplitter {
       // Nothing to do
       return splits;
     }
-    LOG.info("Splitting " + logfiles.length + " hlog(s) in "
-        + srcDir.toString());
+    logAndReport("Splitting " + logfiles.length + " hlog(s) in "
+    + srcDir.toString());
     splits = splitLog(logfiles);
 
     splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
-    LOG.info("hlog file splitting completed in " + splitTime +
+    logAndReport("hlog file splitting completed in " + splitTime +
         " ms for " + srcDir.toString());
     return splits;
   }
+  
+  private void logAndReport(String msg) {
+    status.setStatus(msg);
+    LOG.info(msg);
+  }
 
   /**
    * @return time that this split took
@@ -252,6 +266,7 @@ public class HLogSplitter {
 
     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
 
+    long totalBytesToSplit = countTotalBytes(logfiles);
     splitSize = 0;
 
     outputSink.startWriterThreads(entryBuffers);
@@ -262,7 +277,7 @@ public class HLogSplitter {
        Path logPath = log.getPath();
         long logLength = log.getLen();
         splitSize += logLength;
-        LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
+        logAndReport("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
             + ": " + logPath + ", length=" + logLength);
         Reader in;
         try {
@@ -284,20 +299,36 @@ public class HLogSplitter {
           continue;
         }
       }
+      status.setStatus("Log splits complete. Checking for orphaned logs.");
+      
       if (fs.listStatus(srcDir).length > processedLogs.size()
           + corruptedLogs.size()) {
         throw new OrphanHLogAfterSplitException(
             "Discovered orphan hlog after split. Maybe the "
             + "HRegionServer was not dead when we started");
       }
+
+      status.setStatus("Archiving logs after completed split");
       archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
     } finally {
+      status.setStatus("Finishing writing output logs and closing down.");
       splits = outputSink.finishWritingAndClose();
     }
     return splits;
   }
 
   /**
+   * @return the total size of the passed list of files.
+   */
+  private static long countTotalBytes(FileStatus[] logfiles) {
+    long ret = 0;
+    for (FileStatus stat : logfiles) {
+      ret += stat.getLen();
+    }
+    return ret;
+  }
+
+  /**
    * Splits a HLog file into a temporary staging area. tmpname is used to build
    * the name of the staging area where the recovered-edits will be separated
    * out by region and stored.
@@ -328,6 +359,11 @@ public class HLogSplitter {
     final Map<byte[], Object> logWriters = Collections.
     synchronizedMap(new TreeMap<byte[], Object>(Bytes.BYTES_COMPARATOR));
     boolean isCorrupted = false;
+    
+    Preconditions.checkState(status == null);
+    status = TaskMonitor.get().createStatus(
+        "Splitting log file " + logfile.getPath() +
+        "into a temporary staging area.");
 
     Object BAD_WRITER = new Object();
 
@@ -342,6 +378,7 @@ public class HLogSplitter {
     Path logPath = logfile.getPath();
     long logLength = logfile.getLen();
     LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
+    status.setStatus("Opening log file");
     Reader in = null;
     try {
       in = getReader(fs, logfile, conf, skipErrors);
@@ -351,12 +388,14 @@ public class HLogSplitter {
       isCorrupted = true;
     }
     if (in == null) {
+      status.markComplete("Was nothing to split in log file");
       LOG.warn("Nothing to split in log file " + logPath);
       return true;
     }
     long t = EnvironmentEdgeManager.currentTimeMillis();
     long last_report_at = t;
     if (reporter != null && reporter.progress() == false) {
+      status.markComplete("Failed: reporter.progress asked us to terminate");
       return false;
     }
     int editsCount = 0;
@@ -380,10 +419,12 @@ public class HLogSplitter {
         wap.w.append(entry);
         editsCount++;
         if (editsCount % interval == 0) {
+          status.setStatus("Split " + editsCount + " edits");
           long t1 = EnvironmentEdgeManager.currentTimeMillis();
           if ((t1 - last_report_at) > period) {
             last_report_at = t;
             if (reporter != null && reporter.progress() == false) {
+              status.markComplete("Failed: reporter.progress asked us to terminate");
               progress_failed = true;
               return false;
             }
@@ -416,10 +457,12 @@ public class HLogSplitter {
         wap.w.close();
         LOG.debug("Closed " + wap.p);
       }
-      LOG.info("processed " + editsCount + " edits across " + n + " regions" +
+      String msg = ("processed " + editsCount + " edits across " + n + " regions" +
           " threw away edits for " + (logWriters.size() - n) + " regions" +
           " log file = " + logPath +
           " is corrupted = " + isCorrupted);
+      LOG.info(msg);
+      status.markComplete(msg);
     }
     return true;
   }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java Tue May  3 06:11:06 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseTest
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
@@ -41,6 +42,7 @@ import org.apache.zookeeper.KeeperExcept
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Test the {@link ActiveMasterManager}.
@@ -77,7 +79,8 @@ public class TestActiveMasterManager {
     assertFalse(activeMasterManager.clusterHasActiveMaster.get());
 
     // First test becoming the active master uninterrupted
-    activeMasterManager.blockUntilBecomingActiveMaster();
+    MonitoredTask status = Mockito.mock(MonitoredTask.class);
+    activeMasterManager.blockUntilBecomingActiveMaster(status);
     assertTrue(activeMasterManager.clusterHasActiveMaster.get());
     assertMaster(zk, master);
 
@@ -87,7 +90,7 @@ public class TestActiveMasterManager {
       master, secondDummyMaster);
     zk.registerListener(secondActiveMasterManager);
     assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
-    activeMasterManager.blockUntilBecomingActiveMaster();
+    activeMasterManager.blockUntilBecomingActiveMaster(status);
     assertTrue(activeMasterManager.clusterHasActiveMaster.get());
     assertMaster(zk, master);
   }
@@ -120,7 +123,8 @@ public class TestActiveMasterManager {
     assertFalse(activeMasterManager.clusterHasActiveMaster.get());
 
     // First test becoming the active master uninterrupted
-    activeMasterManager.blockUntilBecomingActiveMaster();
+    activeMasterManager.blockUntilBecomingActiveMaster(
+        Mockito.mock(MonitoredTask.class));
     assertTrue(activeMasterManager.clusterHasActiveMaster.get());
     assertMaster(zk, firstMasterAddress);
 
@@ -201,7 +205,8 @@ public class TestActiveMasterManager {
 
     @Override
     public void run() {
-      manager.blockUntilBecomingActiveMaster();
+      manager.blockUntilBecomingActiveMaster(
+          Mockito.mock(MonitoredTask.class));
       LOG.info("Second master has become the active master!");
       isActiveMaster = true;
     }

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java?rev=1098933&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java Tue May  3 06:11:06 2011
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+public class TestTaskMonitor {
+
+  @Test
+  public void testTaskMonitorBasics() {
+    TaskMonitor tm = new TaskMonitor();
+    assertTrue("Task monitor should start empty",
+        tm.getTasks().isEmpty());
+    
+    // Make a task and fetch it back out
+    MonitoredTask task = tm.createStatus("Test task");
+    MonitoredTask taskFromTm = tm.getTasks().get(0);
+    
+    // Make sure the state is reasonable.
+    assertEquals(task.getDescription(), taskFromTm.getDescription());
+    assertEquals(-1, taskFromTm.getCompletionTimestamp());
+    assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
+    
+    // Mark it as finished
+    task.markComplete("Finished!");
+    assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState());
+    
+    // It should still show up in the TaskMonitor list
+    assertEquals(1, tm.getTasks().size());
+    
+    // If we mark its completion time back a few minutes, it should get gced
+    ((MonitoredTaskImpl)taskFromTm).expireNow();
+    assertEquals(0, tm.getTasks().size());
+  }
+  
+  @Test
+  public void testTasksGetAbortedOnLeak() throws InterruptedException {
+    final TaskMonitor tm = new TaskMonitor();
+    assertTrue("Task monitor should start empty",
+        tm.getTasks().isEmpty());
+    
+    final AtomicBoolean threadSuccess = new AtomicBoolean(false);
+    // Make a task in some other thread and leak it
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        MonitoredTask task = tm.createStatus("Test task");    
+        assertEquals(MonitoredTask.State.RUNNING, task.getState());
+        threadSuccess.set(true);
+      }
+    };
+    t.start();
+    t.join();
+    // Make sure the thread saw the correct state
+    assertTrue(threadSuccess.get());
+    
+    // Make sure the leaked reference gets cleared
+    System.gc();
+    System.gc();
+    System.gc();
+    
+    // Now it should be aborted 
+    MonitoredTask taskFromTm = tm.getTasks().get(0);
+    assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
+  }
+  
+  @Test
+  public void testTaskLimit() throws Exception {
+    TaskMonitor tm = new TaskMonitor();
+    for (int i = 0; i < TaskMonitor.MAX_TASKS + 10; i++) {
+      tm.createStatus("task " + i);
+    }
+    // Make sure it was limited correctly
+    assertEquals(TaskMonitor.MAX_TASKS, tm.getTasks().size());
+    // Make sure we culled the earlier tasks, not later
+    // (i.e. tasks 0 through 9 should have been deleted)
+    assertEquals("task 10", tm.getTasks().get(0).getDescription());
+  }
+
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Tue May  3 06:11:06 2011
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.mockito.Mockito;
 
 import com.google.common.base.Joiner;
 
@@ -600,7 +602,7 @@ public class TestStore extends TestCase 
   private static void flushStore(Store store, long id) throws IOException {
     StoreFlusher storeFlusher = store.getStoreFlusher(id);
     storeFlusher.prepare();
-    storeFlusher.flushCache();
+    storeFlusher.flushCache(Mockito.mock(MonitoredTask.class));
     storeFlusher.commit();
   }
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1098933&r1=1098932&r2=1098933&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Tue May  3 06:11:06 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Ge
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -55,6 +56,7 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Test replay of edits out of a WAL split.
@@ -394,7 +396,8 @@ public class TestWALReplay {
               null) {
             protected boolean internalFlushcache(HLog wal, long myseqid)
             throws IOException {
-              boolean b = super.internalFlushcache(wal, myseqid);
+              boolean b = super.internalFlushcache(wal, myseqid,
+                  Mockito.mock(MonitoredTask.class));
               flushcount.incrementAndGet();
               return b;
             };



Mime
View raw message