hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r939567 [1/2] - in /hadoop/hbase/trunk: ./ core/src/main/java/org/apache/hadoop/hbase/ core/src/main/java/org/apache/hadoop/hbase/master/ core/src/main/java/org/apache/hadoop/hbase/regionserver/ core/src/main/java/org/apache/hadoop/hbase/re...
Date Fri, 30 Apr 2010 06:52:28 GMT
Author: stack
Date: Fri Apr 30 06:52:27 2010
New Revision: 939567

URL: http://svn.apache.org/viewvc?rev=939567&view=rev
Log:
HBASE-2414 Enhance test suite to be able to specify distributed scenarios

Added:
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationListener.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationQueue.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransistions.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/master/TestRegionServerOperationQueue.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/util/Threads.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestInfoServers.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Apr 30 06:52:27 2010
@@ -543,6 +543,7 @@ Release 0.21.0 - Unreleased
    HBASE-2393  ThriftServer instantiates a new HTable per request
                (Bogdan DRAGU via Stack)
    HBASE-2496  Less ArrayList churn on the scan path
+   HBASE-2414  Enhance test suite to be able to specify distributed scenarios
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java Fri Apr 30 06:52:27 2010
@@ -20,8 +20,6 @@
 package org.apache.hadoop.hbase;
 
 import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -29,10 +27,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.ReflectionUtils;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 
 /**
  * This class creates a single process HBase cluster. One thread is created for
@@ -57,8 +56,7 @@ import org.apache.hadoop.util.Reflection
 public class LocalHBaseCluster implements HConstants {
   static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
   private final HMaster master;
-  private final List<RegionServerThread> regionThreads =
-    new ArrayList<RegionServerThread>();
+  private final List<JVMClusterUtil.RegionServerThread> regionThreads;
   private final static int DEFAULT_NO = 1;
   /** local mode */
   public static final String LOCAL = "local";
@@ -84,47 +82,50 @@ public class LocalHBaseCluster implement
    * @param noRegionServers Count of regionservers to start.
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   public LocalHBaseCluster(final Configuration conf,
     final int noRegionServers)
   throws IOException {
+    this(conf, noRegionServers, HMaster.class);
+  }
+
+  /**
+   * Constructor.
+   * @param conf Configuration to use.  Post construction has the master's
+   * address.
+   * @param noRegionServers Count of regionservers to start.
+   * @param masterClass
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public LocalHBaseCluster(final Configuration conf,
+    final int noRegionServers, final Class masterClass)
+  throws IOException {
     this.conf = conf;
     // Create the master
-    this.master = new HMaster(conf);
+    this.master = HMaster.constructMaster(masterClass, conf);
     // Start the HRegionServers.  Always have region servers come up on
     // port '0' so there won't be clashes over default port as unit tests
     // start/stop ports at different times during the life of the test.
     conf.set(REGIONSERVER_PORT, "0");
-    regionServerClass = (Class<? extends HRegionServer>)
-      conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
+    this.regionThreads =
+      new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
+    this.regionServerClass =
+      (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
+       HRegionServer.class);
     for (int i = 0; i < noRegionServers; i++) {
-      addRegionServer();
+      addRegionServer(i);
     }
   }
 
-  /**
-   * Creates a region server.
-   * Call 'start' on the returned thread to make it run.
-   *
-   * @throws IOException
-   * @return Region server added.
-   */
-  public RegionServerThread addRegionServer() throws IOException {
-    synchronized (regionThreads) {
-      HRegionServer server; 
-      try {
-        server = regionServerClass.getConstructor(Configuration.class).
-          newInstance(conf);
-      } catch (Exception e) {
-        IOException ioe = new IOException();
-        ioe.initCause(e);
-        throw ioe;
-      }
-      RegionServerThread t = new RegionServerThread(server,
-          this.regionThreads.size());
-      this.regionThreads.add(t);
-      return t;
-    }
+  public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException {
+    return addRegionServer(this.regionThreads.size());
+  }
+
+  public JVMClusterUtil.RegionServerThread addRegionServer(final int index) throws IOException {
+    JVMClusterUtil.RegionServerThread rst = JVMClusterUtil.createRegionServerThread(this.conf,
+        this.regionServerClass, index);
+    this.regionThreads.add(rst);
+    return rst;
   }
 
   /**
@@ -132,38 +133,7 @@ public class LocalHBaseCluster implement
    * @return region server
    */
   public HRegionServer getRegionServer(int serverNumber) {
-    synchronized (regionThreads) {
-      return regionThreads.get(serverNumber).getRegionServer();
-    }
-  }
-
-  /** runs region servers */
-  public static class RegionServerThread extends Thread {
-    private final HRegionServer regionServer;
-    
-    RegionServerThread(final HRegionServer r, final int index) {
-      super(r, "RegionServer:" + index);
-      this.regionServer = r;
-    }
-
-    /** @return the region server */
-    public HRegionServer getRegionServer() {
-      return this.regionServer;
-    }
-    
-    /**
-     * Block until the region server has come online, indicating it is ready
-     * to be used.
-     */
-    public void waitForServerOnline() {
-      while (!regionServer.isOnline()) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          // continue waiting
-        }
-      }
-    }
+    return regionThreads.get(serverNumber).getRegionServer();
   }
 
   /**
@@ -176,7 +146,7 @@ public class LocalHBaseCluster implement
   /**
    * @return Read-only list of region server threads.
    */
-  public List<RegionServerThread> getRegionServers() {
+  public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
     return Collections.unmodifiableList(this.regionThreads);
   }
 
@@ -187,10 +157,8 @@ public class LocalHBaseCluster implement
    * @return Name of region server that just went down.
    */
   public String waitOnRegionServer(int serverNumber) {
-    RegionServerThread regionServerThread;
-    synchronized (regionThreads) {
-      regionServerThread = this.regionThreads.remove(serverNumber);
-    }
+    JVMClusterUtil.RegionServerThread regionServerThread =
+      this.regionThreads.remove(serverNumber);
     while (regionServerThread.isAlive()) {
       try {
         LOG.info("Waiting on " +
@@ -211,14 +179,12 @@ public class LocalHBaseCluster implement
    */
   public void join() {
     if (this.regionThreads != null) {
-      synchronized(this.regionThreads) {
         for(Thread t: this.regionThreads) {
           if (t.isAlive()) {
             try {
               t.join();
-            } catch (InterruptedException e) {
-              // continue
-            }
+          } catch (InterruptedException e) {
+            // continue
           }
         }
       }
@@ -234,79 +200,16 @@ public class LocalHBaseCluster implement
   
   /**
    * Start the cluster.
-   * @return Address to use contacting master.
    */
-  public String startup() {
-    this.master.start();
-    synchronized (regionThreads) {
-      for (RegionServerThread t: this.regionThreads) {
-        t.start();
-      }
-    }
-    return this.master.getMasterAddress().toString();
+  public void startup() {
+    JVMClusterUtil.startup(this.master, this.regionThreads);
   }
 
   /**
    * Shut down the mini HBase cluster
-   * @throws IOException 
-   */
-  public void shutdown() throws IOException {
-    LOG.debug("Shutting down HBase Cluster");
-    // Be careful about how we shutdown hdfs.  Its done elsewhere.
-    synchronized (this.regionThreads) {
-      for (RegionServerThread t: this.regionThreads) {
-        t.getRegionServer().setShutdownHDFS(false);
-      }
-    }
-    if(this.master != null) {
-      this.master.shutdown();
-    }
-    // regionServerThreads can never be null because they are initialized when
-    // the class is constructed.
-    synchronized(this.regionThreads) {
-      for(Thread t: this.regionThreads) {
-        if (t.isAlive()) {
-          try {
-            t.join();
-          } catch (InterruptedException e) {
-            // continue
-          }
-        }
-      }
-    }
-    if (this.master != null) {
-      while (this.master.isAlive()) {
-        try {
-          // The below has been replaced to debug sometime hangs on end of
-          // tests.
-          // this.master.join():
-          threadDumpingJoin(this.master);
-        } catch(InterruptedException e) {
-          // continue
-        }
-      }
-    }
-    LOG.info("Shutdown " + this.regionThreads.size() + " region server(s)");
-  }
-
-  /**
-   * @param t
-   * @throws InterruptedException
    */
-  public void threadDumpingJoin(final Thread t) throws InterruptedException {
-    if (t == null) {
-      return;
-    }
-    long startTime = System.currentTimeMillis();
-    while (t.isAlive()) {
-      Thread.sleep(1000);
-      if (System.currentTimeMillis() - startTime > 60000) {
-        startTime = System.currentTimeMillis();
-        ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
-            "Automatic Stack Trace every 60 seconds waiting on " +
-            t.getName());
-      }
-    }
+  public void shutdown() {
+    JVMClusterUtil.shutdown(this.master, this.regionThreads);
   }
 
   /**

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Fri Apr 30 06:52:27 2010
@@ -89,10 +89,6 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -141,13 +137,6 @@ public class HMaster extends Thread impl
   // The Path to the old logs dir
   private final Path oldLogDir;
 
-  // Queues for RegionServerOperation events.  Includes server open, shutdown,
-  // and region open and close.
-  private final DelayQueue<RegionServerOperation> delayedToDoQueue =
-    new DelayQueue<RegionServerOperation>();
-  private final BlockingQueue<RegionServerOperation> toDoQueue =
-    new PriorityBlockingQueue<RegionServerOperation>();
-
   private final HBaseServer rpcServer;
   private final HServerAddress address;
 
@@ -157,6 +146,7 @@ public class HMaster extends Thread impl
 
   private long lastFragmentationQuery = -1L;
   private Map<String, Integer> fragmentation = null;
+  private final RegionServerOperationQueue regionServerOperationQueue;
   
   /** 
    * Constructor
@@ -202,6 +192,8 @@ public class HMaster extends Thread impl
     this.zkMasterAddressWatcher =
       new ZKMasterAddressWatcher(this.zooKeeperWrapper, this.shutdownRequested);
     this.zkMasterAddressWatcher.writeAddressToZooKeeper(this.address, true);
+    this.regionServerOperationQueue =
+      new RegionServerOperationQueue(this.conf, this.closed);
     
     serverManager = new ServerManager(this);
     regionManager = new RegionManager(this);
@@ -407,6 +399,10 @@ public class HMaster extends Thread impl
     return this.serverManager.getAverageLoad();
   }
 
+  RegionServerOperationQueue getRegionServerOperationQueue () {
+    return this.regionServerOperationQueue;
+  }
+
   /**
    * Get the directory where old logs go
    * @return the dir
@@ -433,7 +429,7 @@ public class HMaster extends Thread impl
     startServiceThreads();
     /* Main processing loop */
     try {
-      while (!this.closed.get()) {
+      FINISHED: while (!this.closed.get()) {
         // check if we should be shutting down
         if (this.shutdownRequested.get()) {
           // The region servers won't all exit until we stop scanning the
@@ -444,9 +440,15 @@ public class HMaster extends Thread impl
             break;
           }
         }
-        // work on the TodoQueue. If that fails, we should shut down.
-        if (!processToDoQueue()) {
-          break;
+        if (this.regionManager.getRootRegionLocation() != null) {
+          switch(this.regionServerOperationQueue.process()) {
+          case FAILED:
+            break FINISHED;
+          case REQUEUED_BUT_PROBLEM:
+            if (!checkFileSystem()) break FINISHED;
+          default: // PROCESSED, NOOP, REQUEUED:
+            break;
+          }
         }
       }
     } catch (Throwable t) {
@@ -475,93 +477,6 @@ public class HMaster extends Thread impl
   }
 
   /*
-   * Try to get an operation off of the todo queue and perform it.
-   * We actually have two tiers of todo; those that we couldn't do immediately
-   * which we put aside and then current current todos.  We look at put-asides
-   * first.
-   * @return True if we have nothing to do or we're to close.
-   */ 
-  private boolean processToDoQueue() {
-    RegionServerOperation op = null;
-    // block until the root region is online
-    if (this.regionManager.getRootRegionLocation() != null) {
-      // We can't process server shutdowns unless the root region is online
-      op = this.delayedToDoQueue.poll();
-    }
-    // if there aren't any todo items in the queue, sleep for a bit.
-    if (op == null) {
-      try {
-        op = this.toDoQueue.poll(this.threadWakeFrequency, TimeUnit.MILLISECONDS);
-      } catch (InterruptedException e) {
-        // continue
-      }
-    }
-    // at this point, if there's still no todo operation, or we're supposed to
-    // be closed, return.
-    if (op == null || this.closed.get()) {
-      return true;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing todo: " + op.toString());
-    }
-    try {
-      // perform the operation. 
-      if (!op.process()) {
-        // Operation would have blocked because not all meta regions are
-        // online. This could cause a deadlock, because this thread is waiting
-        // for the missing meta region(s) to come back online, but since it
-        // is waiting, it cannot process the meta region online operation it
-        // is waiting for. So put this operation back on the queue for now.
-        if (this.toDoQueue.size() == 0) {
-          // The queue is currently empty so wait for a while to see if what
-          // we need comes in first
-          this.sleeper.sleep();
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Put " + op.toString() + " back on queue");
-        }
-        queue(op);
-      }
-    } catch (Exception ex) {
-      // There was an exception performing the operation.
-      if (ex instanceof RemoteException) {
-        try {
-          ex = RemoteExceptionHandler.decodeRemoteException(
-            (RemoteException)ex);
-        } catch (IOException e) {
-          ex = e;
-          LOG.warn("main processing loop: " + op.toString(), e);
-        }
-      }
-      // make sure the filesystem is still ok. otherwise, we're toast.
-      if (!checkFileSystem()) {
-        return false;
-      }
-      LOG.warn("Adding to delayed queue: " + op.toString(), ex);
-      requeue(op);
-    }
-    return true;
-  }
-
-  /**
-   * @param op operation to requeue; added to the delayedToDoQueue.
-   */
-  void requeue(final RegionServerOperation op) {
-    this.delayedToDoQueue.put(op);
-  }
-
-  /**
-   * @param op Operation to queue.  Added to the TODO queue.
-   */
-  void queue(final RegionServerOperation op) {
-    try {
-      this.toDoQueue.put(op);
-    } catch (InterruptedException e) {
-      LOG.error("Failed queue: " + op.toString(), e);
-    }
-  }
-
-  /*
    * Joins cluster.  Checks to see if this instance of HBase is fresh or the
    * master was started following a failover. In the second case, it inspects
    * the region server directory and gets their regions assignment.
@@ -706,12 +621,7 @@ public class HMaster extends Thread impl
   void startShutdown() {
     this.closed.set(true);
     this.regionManager.stopScanners();
-    synchronized(toDoQueue) {
-      this.toDoQueue.clear();
-      this.delayedToDoQueue.clear();
-      // Wake main thread; TODO: Is this necessary?
-      this.toDoQueue.notifyAll();
-    }
+    this.regionServerOperationQueue.shutdown();
     this.serverManager.notifyServers();
   }
 
@@ -752,7 +662,18 @@ public class HMaster extends Thread impl
   public HMsg [] regionServerReport(HServerInfo serverInfo, HMsg msgs[], 
     HRegionInfo[] mostLoadedRegions)
   throws IOException {
-    return serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions);
+    return adornRegionServerAnswer(serverInfo,
+      this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions));
+  }
+
+  /**
+   * Override if you'd add messages to return to regionserver <code>hsi</code>
+   * @param messages Messages to add to
+   * @return Messages to return to 
+   */
+  protected HMsg [] adornRegionServerAnswer(final HServerInfo hsi,
+      final HMsg [] msgs) {
+    return msgs;
   }
 
   public boolean isMasterRunning() {
@@ -1192,7 +1113,26 @@ public class HMaster extends Thread impl
     System.exit(0);
   }
 
-  protected static void doMain(String [] args, Class<? extends HMaster> clazz) {
+  /**
+   * Utility for constructing an instance of the passed HMaster class.
+   * @param masterClass
+   * @param conf
+   * @return HMaster instance.
+   */
+  public static HMaster constructMaster(Class<? extends HMaster> masterClass,
+      final Configuration conf)  {
+    try {
+      Constructor<? extends HMaster> c =
+        masterClass.getConstructor(Configuration.class);
+      return c.newInstance(conf);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed construction of " +
+        "Master: " + masterClass.toString(), e);
+    }
+  }
+
+  protected static void doMain(String [] args,
+      Class<? extends HMaster> masterClass) {
     if (args.length < 1) {
       printUsageAndExit();
     }
@@ -1239,9 +1179,7 @@ public class HMaster extends Thread impl
             conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort));
             (new LocalHBaseCluster(conf)).startup();
           } else {
-            Constructor<? extends HMaster> c =
-              clazz.getConstructor(Configuration.class);
-            HMaster master = c.newInstance(conf);
+            HMaster master = constructMaster(masterClass, conf);
             if (master.shutdownRequested.get()) {
               LOG.info("Won't bring the Master up as a shutdown is requested");
               return;

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java Fri Apr 30 06:52:27 2010
@@ -58,6 +58,13 @@ class ProcessRegionClose extends Process
 
   @Override
   protected boolean process() throws IOException {
+    if (!metaRegionAvailable()) {
+      // We can't proceed unless the meta region we are going to update
+      // is online. metaRegionAvailable() has put this operation on the
+      // delayedToDoQueue, so return true so the operation is not put
+      // back on the toDoQueue
+      return true;
+    }
     Boolean result = null;
     if (offlineRegion || reassignRegion) {
       result =

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java Fri Apr 30 06:52:27 2010
@@ -58,6 +58,7 @@ class ProcessRegionOpen extends ProcessR
 
   @Override
   protected boolean process() throws IOException {
+    // TODO: The below check is way too convoluted!!!
     if (!metaRegionAvailable()) {
       // We can't proceed unless the meta region we are going to update
       // is online. metaRegionAvailable() has put this operation on the

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java Fri Apr 30 06:52:27 2010
@@ -109,6 +109,13 @@ class ProcessServerShutdown extends Regi
     }
   }
 
+  /**
+   * @return Name of server we are processing.
+   */
+  public HServerAddress getDeadServerAddress() {
+    return this.deadServerAddress;
+  }
+
   @Override
   public String toString() {
     return "ProcessServerShutdown of " + this.deadServer;

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java Fri Apr 30 06:52:27 2010
@@ -33,7 +33,7 @@ abstract class RegionServerOperation imp
   
   private long expire;
   protected final HMaster master;
-  final int delay;
+  private int delay;
   
   protected RegionServerOperation(HMaster master) {
     this.master = master;
@@ -41,13 +41,28 @@ abstract class RegionServerOperation imp
       getInt("hbase.server.thread.wakefrequency", 10 * 1000);
     // Set the future time at which we expect to be released from the
     // DelayQueue we're inserted in on lease expiration.
-    this.expire = whenToExpire();
+    resetExpiration();
+  }
+
+  /**
+   * Call before putting this back on the delay queue.
+   * @return When we will expire next.
+   */
+  long resetExpiration() {
+    // Set the future time at which we expect to be released from the
+    // DelayQueue we're inserted in on lease expiration.
+    this.expire = System.currentTimeMillis() + this.delay;
+    return this.expire;
   }
 
   public long getDelay(TimeUnit unit) {
     return unit.convert(this.expire - System.currentTimeMillis(),
       TimeUnit.MILLISECONDS);
   }
+
+  void setDelay(final int d) {
+    this.delay = d;
+  }
   
   public int compareTo(Delayed o) {
     return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)
@@ -55,8 +70,7 @@ abstract class RegionServerOperation imp
   }
   
   protected void requeue() {
-    this.expire = whenToExpire();
-    this.master.requeue(this);
+    this.master.getRegionServerOperationQueue().putOnDelayQueue(this);
   }
 
   private long whenToExpire() {
@@ -103,5 +117,6 @@ abstract class RegionServerOperation imp
   protected int getPriority() {
     return Integer.MAX_VALUE;
   }
+
   protected abstract boolean process() throws IOException;
-}
\ No newline at end of file
+}

Added: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationListener.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationListener.java?rev=939567&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationListener.java (added)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationListener.java Fri Apr 30 06:52:27 2010
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+
+/**
+ * Listener for regionserver events in master.
+ * @see HMaster#registerRegionServerOperationListener(RegionServerOperationListener)
+ * @see HMaster#unregisterRegionServerOperationListener(RegionServerOperationListener)
+ */
+public interface RegionServerOperationListener {
+  /**
+   * Called before processing <code>op</code>
+   * @param op
+   * @return True if we are to proceed w/ processing.
+   * @exception IOException
+   */
+  public boolean process(final RegionServerOperation op) throws IOException;
+
+  /**
+   * Called after <code>op</code> has been processed.
+   * @param op The operation that just completed.
+   */
+  public void processed(final RegionServerOperation op);
+}

Added: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationQueue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationQueue.java?rev=939567&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationQueue.java (added)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperationQueue.java Fri Apr 30 06:52:27 2010
@@ -0,0 +1,211 @@
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * Keeps up the queue of {@link RegionServerOperation}s.
+ * Has both live queue and a temporary put-aside queue; if processing of the
+ * live todo queue fails for some reason, we'll add the item back on the delay
+ * queue for retry later.  Call {@link #shutdown()} to effect a cleanup of
+ * queues when done.  Listen to this queue by registering
+ * {@link RegionServerOperationListener}s.
+ * @see #registerRegionServerOperationListener(RegionServerOperationListener)
+ * @see #unregisterRegionServerOperationListener(RegionServerOperationListener)
+ */
+public class RegionServerOperationQueue {
+  // TODO: Build up the junit test of this class.
+  private final Log LOG = LogFactory.getLog(this.getClass());
+  
+  /**
+   * Enums returned by {@link RegionServerOperationQueue#process()};
+   */
+  public static enum ProcessingResultCode {
+    /**
+     * Operation was processed successfully.
+     */
+    PROCESSED,
+    /**
+     * Nothing to do.
+     */
+    NOOP,
+    /**
+     * Operation was put-aside for now.  Will be retried later.
+     */
+    REQUEUED,
+    /**
+     * Failed processing of the operation.
+     */
+    FAILED,
+    /**
+     * Operation was requeued but we failed its processing for some reason
+     * (Bad filesystem?).
+     */
+    REQUEUED_BUT_PROBLEM
+  };
+
+  /*
+   * Do not put items directly on this queue. Use {@link #putOnDelayQueue(RegionServerOperation)}.
+   * It makes sure the expiration on the RegionServerOperation added is updated.
+   */
+  private final DelayQueue<RegionServerOperation> delayedToDoQueue =
+    new DelayQueue<RegionServerOperation>();
+  private final BlockingQueue<RegionServerOperation> toDoQueue =
+    new PriorityBlockingQueue<RegionServerOperation>();
+  private final Set<RegionServerOperationListener> listeners =
+    new CopyOnWriteArraySet<RegionServerOperationListener>();
+  private final int threadWakeFrequency;
+  private final AtomicBoolean closed;
+  private final Sleeper sleeper;
+
+  RegionServerOperationQueue(final Configuration c, final AtomicBoolean closed) {
+    this.threadWakeFrequency = c.getInt(HMaster.THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.closed = closed;
+    this.sleeper = new Sleeper(this.threadWakeFrequency, this.closed);
+  }
+
+  public void put(final RegionServerOperation op) {
+    try {
+      this.toDoQueue.put(op);
+    } catch (InterruptedException e) {
+      LOG.warn("Insertion into todo queue interrupted; putting on delay queue", e);
+      putOnDelayQueue(op);
+    }
+  }
+
+  /**
+   * Try to get an operation off of the queue and process it.
+   * @return {@link ProcessingResultCode#PROCESSED},
+   * {@link ProcessingResultCode#REQUEUED},
+   * {@link ProcessingResultCode#REQUEUED_BUT_PROBLEM}
+   */ 
+  public synchronized ProcessingResultCode process() {
+    RegionServerOperation op = delayedToDoQueue.poll();
+    // if there aren't any todo items in the queue, sleep for a bit.
+    if (op == null) {
+      try {
+        op = toDoQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        LOG.debug("Interrupted", e);
+      }
+    }
+
+    // At this point, if there's still no todo operation, or we're supposed to
+    // be closed, return.
+    if (op == null || closed.get()) {
+      return ProcessingResultCode.NOOP;
+    }
+
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing todo: " + op.toString());
+      }
+      if (!process(op)) {
+        // Add it back on the queue.
+        putOnDelayQueue(op);
+      } else if (op.process()) {
+        processed(op);
+      } else {
+        // Operation would have blocked because not all meta regions are
+        // online. This could cause a deadlock, because this thread is waiting
+        // for the missing meta region(s) to come back online, but since it
+        // is waiting, it cannot process the meta region online operation it
+        // is waiting for. So put this operation back on the queue for now.
+        if (toDoQueue.size() == 0) {
+          // The queue is currently empty so wait for a while to see if what
+          // we need comes in first
+          this.sleeper.sleep();
+        }
+        try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Put " + op.toString() + " back on queue");
+          }
+          toDoQueue.put(op);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(
+            "Putting into toDoQueue was interrupted.", e);
+        }
+      }
+    } catch (Exception ex) {
+      // There was an exception performing the operation.
+      if (ex instanceof RemoteException) {
+        try {
+          ex = RemoteExceptionHandler.decodeRemoteException(
+            (RemoteException)ex);
+        } catch (IOException e) {
+          ex = e;
+          LOG.warn("main processing loop: " + op.toString(), e);
+        }
+      }
+      LOG.warn("Failed processing: " + op.toString() +
+        "; putting onto delayed todo queue", ex);
+      putOnDelayQueue(op);
+      return ProcessingResultCode.REQUEUED_BUT_PROBLEM;
+    }
+    return ProcessingResultCode.REQUEUED;
+  }
+
+  void putOnDelayQueue(final RegionServerOperation op) {
+    op.resetExpiration();
+    this.delayedToDoQueue.put(op);
+  }
+
+  /**
+   * Clean up the queues.
+   */
+  public synchronized void shutdown() {
+    this.toDoQueue.clear();
+    this.delayedToDoQueue.clear();
+  }
+
+  /**
+   * @param l Register this listener of RegionServerOperation events.
+   */
+  public void registerRegionServerOperationListener(final RegionServerOperationListener l) {
+    this.listeners.add(l);
+  }
+
+  /**
+   * @param l Unregister this listener for RegionServerOperation events.
+   * @return True if this listener was registered.
+   */
+  public boolean unregisterRegionServerOperationListener(final RegionServerOperationListener l) {
+    return this.listeners.remove(l);
+  }
+
+  /*
+   * Tell listeners that we processed a RegionServerOperation.
+   * @param op Operation to tell the world about.
+   */
+  private void processed(final RegionServerOperation op) {
+    if (this.listeners.isEmpty()) return;
+    for (RegionServerOperationListener listener: this.listeners) {
+      listener.processed(op);
+    }
+  }
+
+  /*
+   * Tell listeners that we processed a RegionServerOperation.
+   * @param op Operation to tell the world about.
+   */
+  private boolean process(final RegionServerOperation op) throws IOException {
+    if (this.listeners.isEmpty()) return true;
+    for (RegionServerOperationListener listener: this.listeners) {
+      if (!listener.process(op)) return false;
+    }
+    return true;
+  }
+}

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Fri Apr 30 06:52:27 2010
@@ -196,7 +196,8 @@ public class ServerManager implements HC
       // The startup message was from a known server with the same name.
       // Timeout the old one right away.
       this.master.getRegionManager().getRootRegionLocation();
-      this.master.queue(new ProcessServerShutdown(this.master, storedInfo));
+      RegionServerOperation op = new ProcessServerShutdown(master, storedInfo);
+      this.master.getRegionServerOperationQueue().put(op);
     }
     recordNewServer(info);
   }
@@ -599,8 +600,9 @@ public class ServerManager implements HC
           // Note that the table has been assigned and is waiting for the
           // meta table to be updated.
           this.master.getRegionManager().setOpen(region.getRegionNameAsString());
-          // Queue up an update to note the region location.
-          this.master.queue(new ProcessRegionOpen(master, serverInfo, region));
+          RegionServerOperation op =
+            new ProcessRegionOpen(master, serverInfo, region);
+          this.master.getRegionServerOperationQueue().put(op);
         }
       }
     }
@@ -637,8 +639,9 @@ public class ServerManager implements HC
       //       processed before an open resulting in the master not agreeing on
       //       the region's state.
       this.master.getRegionManager().setClosed(region.getRegionNameAsString());
-      this.master.queue(new ProcessRegionClose(master, region,
-        offlineRegion, reassignRegion));
+      RegionServerOperation op =
+        new ProcessRegionClose(master, region, offlineRegion, reassignRegion);
+      this.master.getRegionServerOperationQueue().put(op);
     }
   }
   
@@ -800,7 +803,7 @@ public class ServerManager implements HC
     }
 
     public void process(WatchedEvent event) {
-      if(event.getType().equals(EventType.NodeDeleted)) {
+      if (event.getType().equals(EventType.NodeDeleted)) {
         LOG.info(server + " znode expired");
         // Remove the server from the known servers list and update load info
         serverAddressToServerInfo.remove(serverAddress);
@@ -821,7 +824,8 @@ public class ServerManager implements HC
             }
           }
           deadServers.add(server);
-          master.queue(new ProcessServerShutdown(master, info));
+          RegionServerOperation op = new ProcessServerShutdown(master, info);
+          master.getRegionServerOperationQueue().put(op);
         }
         synchronized (serversToServerInfo) {
           serversToServerInfo.notifyAll();
@@ -872,5 +876,4 @@ public class ServerManager implements HC
   public void setMinimumServerCount(int minimumServerCount) {
     this.minimumServerCount = minimumServerCount;
   }
-
 }

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Apr 30 06:52:27 2010
@@ -119,7 +119,7 @@ public class HRegionServer implements HC
   private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
   private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
   private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
-  
+
   // Set when a report to the master comes back with a message asking us to
   // shutdown.  Also set by call to stop when debugging or running unit tests
   // of HRegionServer in isolation. We use AtomicBoolean rather than
@@ -169,7 +169,7 @@ public class HRegionServer implements HC
   // Server to handle client requests.  Default access so can be accessed by
   // unit tests.
   HBaseServer server;
-  
+
   // Leases
   private Leases leases;
   
@@ -221,7 +221,9 @@ public class HRegionServer implements HC
 
   private final long rpcTimeout;
 
-  // Address passed in to constructor.
+  // Address passed in to constructor.  This is not always the address we run
+  // with.  For example, if passed port is 0, then we are to pick a port.  The
+  // actual address we run with is in the #serverInfo data member.
   private final HServerAddress address;
 
   // The main region server thread.
@@ -244,7 +246,11 @@ public class HRegionServer implements HC
         conf.get("hbase.regionserver.dns.nameserver","default"));
     String addressStr = machineName + ":" + 
       conf.get(REGIONSERVER_PORT, Integer.toString(DEFAULT_REGIONSERVER_PORT));
-    this.address = new HServerAddress(addressStr);
+    // This is not necessarily the address we will run with.  The address we
+    // use will be in #serverInfo data member.  For example, we may have been
+    // passed a port of 0 which means we should pick some ephemeral port to bind
+    // to.
+    address = new HServerAddress(addressStr);
     LOG.info("My address is " + address);
 
     this.abortRequested = false;
@@ -652,12 +658,14 @@ public class HRegionServer implements HC
       LOG.info("stopping server at: " +
         serverInfo.getServerAddress().toString());
     }
+
+    // Make sure the proxy is down.
     if (this.hbaseMaster != null) {
       HBaseRPC.stopProxy(this.hbaseMaster);
       this.hbaseMaster = null;
     }
-    join();
 
+    join();
     zooKeeperWrapper.close();
 
     LOG.info(Thread.currentThread().getName() + " exiting");
@@ -716,7 +724,13 @@ public class HRegionServer implements HC
       // Master may have sent us a new address with the other configs.
       // Update our address in this case. See HBASE-719
       String hra = conf.get("hbase.regionserver.address");
-      if (address != null) {
+      // TODO: The below used to be this.address != null.  Was broken by what
+      // looks like a mistake in:
+      //
+      // HBASE-1215 migration; metautils scan of meta region was broken; wouldn't see first row
+      // ------------------------------------------------------------------------
+      // r796326 | stack | 2009-07-21 07:40:34 -0700 (Tue, 21 Jul 2009) | 38 lines
+      if (hra != null) {
         HServerAddress hsa = new HServerAddress (hra,
           this.serverInfo.getServerAddress().getPort());
         LOG.info("Master passed us address to use. Was=" +
@@ -1180,7 +1194,7 @@ public class HRegionServer implements HC
       notifyAll(); // FindBugs NN_NAKED_NOTIFY  
     }
   }
-  
+
   /**
    * Cause the server to exit without closing the regions it is serving, the
    * log it is using and without notifying the master.
@@ -1228,9 +1242,8 @@ public class HRegionServer implements HC
         // Do initial RPC setup.  The final argument indicates that the RPC
         // should retry indefinitely.
         master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
-            HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
-            masterAddress.getInetSocketAddress(),
-            this.conf, -1, this.rpcTimeout);
+          HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
+          masterAddress.getInetSocketAddress(), this.conf, -1, this.rpcTimeout);
       } catch (IOException e) {
         LOG.warn("Unable to connect to master. Retrying. Error was:", e);
         sleeper.sleep();
@@ -2247,7 +2260,7 @@ public class HRegionServer implements HC
     }
     throw new IOException("Unknown protocol to name node: " + protocol);
   }
-  
+
   /**
    * @return Queue to which you can add outbound messages.
    */
@@ -2290,9 +2303,101 @@ public class HRegionServer implements HC
     return fs;
   }
 
+  /**
+   * @return Info on port this server has bound to, etc.
+   */
+  public HServerInfo getServerInfo() { return this.serverInfo; }
+
+  /** {@inheritDoc} */
+  public long incrementColumnValue(byte [] regionName, byte [] row, 
+      byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
+  throws IOException {
+    checkOpen();
+
+    if (regionName == null) {
+      throw new IOException("Invalid arguments to incrementColumnValue " + 
+      "regionName is null");
+    }
+    requestCount.incrementAndGet();
+    try {
+      HRegion region = getRegion(regionName);
+      long retval = region.incrementColumnValue(row, family, qualifier, amount, 
+          writeToWAL);
+      
+      return retval;
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  /** {@inheritDoc} */
+  public HRegionInfo[] getRegionsAssignment() throws IOException {
+    HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()];
+    Iterator<HRegion> ite = onlineRegions.values().iterator();
+    for(int i = 0; ite.hasNext(); i++) {
+      regions[i] = ite.next().getRegionInfo();
+    }
+    return regions;
+  }
+  
+  /** {@inheritDoc} */
+  public HServerInfo getHServerInfo() throws IOException {
+    return serverInfo;
+  }
+
+  @Override
+  public MultiPutResponse multiPut(MultiPut puts) throws IOException {
+    MultiPutResponse resp = new MultiPutResponse();
+
+    // do each region as it's own.
+    for( Map.Entry<byte[],List<Put>> e: puts.puts.entrySet()) {
+      int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
+      resp.addResult(e.getKey(), result);
+
+      e.getValue().clear(); // clear some RAM
+    }
+
+    return resp;
+  }
+
+  public String toString() {
+    return this.serverInfo.toString();
+  }
+
+  /**
+   * Interval at which threads should run
+   * @return the interval
+   */
+  public int getThreadWakeFrequency() {
+    return threadWakeFrequency;
+  }
+
   //
   // Main program and support routines
   //
+  
+  /**
+   * @param hrs
+   * @return Thread the RegionServer is running in correctly named.
+   */
+  public static Thread startRegionServer(final HRegionServer hrs) {
+    return startRegionServer(hrs,
+      "regionserver" + hrs.server.getListenerAddress());
+  }
+
+  /**
+   * @param hrs
+   * @param name
+   * @return Thread the RegionServer is running in correctly named.
+   */
+  public static Thread startRegionServer(final HRegionServer hrs,
+      final String name) {
+    Thread t = new Thread(hrs);
+    t.setName(name);
+    t.start();
+    return t;
+  }
 
   private static void printUsageAndExit() {
     printUsageAndExit(null);
@@ -2305,7 +2410,7 @@ public class HRegionServer implements HC
     System.err.println("Usage: java org.apache.hbase.HRegionServer start|stop");
     System.exit(0);
   }
-  
+
   /**
    * Do class main.
    * @param args
@@ -2335,10 +2440,7 @@ public class HRegionServer implements HC
             }
             Constructor<? extends HRegionServer> c =
               regionServerClass.getConstructor(Configuration.class);
-            HRegionServer hrs = c.newInstance(conf);
-            Thread t = new Thread(hrs);
-            t.setName("regionserver" + hrs.server.getListenerAddress());
-            t.start();
+            startRegionServer(c.newInstance(conf));
           }
         } catch (Throwable t) {
           LOG.error( "Can not start region server because "+
@@ -2357,50 +2459,6 @@ public class HRegionServer implements HC
       printUsageAndExit();
     }
   }
-  
-  /** {@inheritDoc} */
-  public long incrementColumnValue(byte [] regionName, byte [] row, 
-      byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
-  throws IOException {
-    checkOpen();
-
-    if (regionName == null) {
-      throw new IOException("Invalid arguments to incrementColumnValue " + 
-      "regionName is null");
-    }
-    requestCount.incrementAndGet();
-    try {
-      HRegion region = getRegion(regionName);
-      return region.incrementColumnValue(row, family, qualifier, amount,
-        writeToWAL);
-    } catch (IOException e) {
-      checkFileSystem();
-      throw e;
-    }
-  }
-  
-  /** {@inheritDoc} */
-  public HRegionInfo[] getRegionsAssignment() throws IOException {
-    HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()];
-    Iterator<HRegion> ite = onlineRegions.values().iterator();
-    for(int i = 0; ite.hasNext(); i++) {
-      regions[i] = ite.next().getRegionInfo();
-    }
-    return regions;
-  }
-  
-  /** {@inheritDoc} */
-  public HServerInfo getHServerInfo() throws IOException {
-    return serverInfo;
-  }
-
-  /**
-   * Interval at which threads should run
-   * @return the interval
-   */
-  public int getThreadWakeFrequency() {
-    return threadWakeFrequency;
-  }
 
   /**
    * @param args
@@ -2413,20 +2471,4 @@ public class HRegionServer implements HC
         HRegionServer.class);
     doMain(args, regionServerClass);
   }
-
-
-  @Override
-  public MultiPutResponse multiPut(MultiPut puts) throws IOException {
-    MultiPutResponse resp = new MultiPutResponse();
-
-    // do each region as it's own.
-    for( Map.Entry<byte[],List<Put>> e: puts.puts.entrySet()) {
-      int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
-      resp.addResult(e.getKey(), result);
-
-      e.getValue().clear(); // clear some RAM
-    }
-
-    return resp;
-  }
 }

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Apr 30 06:52:27 2010
@@ -803,6 +803,10 @@ public class HLog implements HConstants,
           LOG.warn(getName() + " was shut down while waiting for sync");
           return;
         }
+        if (syncerShuttingDown) {
+          LOG.warn(getName() + " was shut down while waiting for sync");
+          return;
+        }
         if(force) {
           forceSync = true;
         }

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/util/Threads.java Fri Apr 30 06:52:27 2010
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.util;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import java.io.PrintWriter;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import java.lang.Thread.UncaughtExceptionHandler;
 
@@ -74,6 +76,7 @@ public class Threads {
    * @param t Thread to shutdown
    */
   public static void shutdown(final Thread t, final long joinwait) {
+    if (t == null) return;
     while (t.isAlive()) {
       try {
         t.join(joinwait);
@@ -82,4 +85,38 @@ public class Threads {
       }
     }
   }
-}
\ No newline at end of file
+
+
+  /**
+   * @param t Waits on the passed thread to die dumping a threaddump every
+   * minute while its up.
+   * @throws InterruptedException
+   */
+  public static void threadDumpingIsAlive(final Thread t)
+  throws InterruptedException {
+    if (t == null) {
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+    while (t.isAlive()) {
+      Thread.sleep(1000);
+      if (System.currentTimeMillis() - startTime > 60000) {
+        startTime = System.currentTimeMillis();
+        ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
+            "Automatic Stack Trace every 60 seconds waiting on " +
+            t.getName());
+      }
+    }
+  }
+
+  /**
+   * @param millis How long to sleep for in milliseconds.
+   */
+  public static void sleep(int millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java Fri Apr 30 06:52:27 2010
@@ -201,8 +201,8 @@ public abstract class HBaseClusterTestCa
    * regionservers and master threads are no long alive.
    */
   public void threadDumpingJoin() {
-    if (this.cluster.getRegionThreads() != null) {
-      for(Thread t: this.cluster.getRegionThreads()) {
+    if (this.cluster.getRegionServerThreads() != null) {
+      for(Thread t: this.cluster.getRegionServerThreads()) {
         threadDumpingJoin(t);
       }
     }

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Fri Apr 30 06:52:27 2010
@@ -40,12 +40,12 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.zookeeper.ZooKeeper;
@@ -60,7 +60,6 @@ import org.apache.zookeeper.ZooKeeper;
  * make changes to configuration parameters.
  */
 public class HBaseTestingUtility {
-  
   private final Log LOG = LogFactory.getLog(getClass());
 
   private final Configuration conf;
@@ -68,6 +67,7 @@ public class HBaseTestingUtility {
   private MiniDFSCluster dfsCluster = null;
   private MiniHBaseCluster hbaseCluster = null;
   private MiniMRCluster mrCluster = null;
+  // If non-null, then already a cluster running.
   private File clusterTestBuildDir = null;
   private HBaseAdmin hbaseAdmin = null;
 
@@ -78,7 +78,7 @@ public class HBaseTestingUtility {
   public HBaseTestingUtility(Configuration conf) {
     this.conf = conf;
   }
-  
+
   /** System property key to get test directory value.
    */
   public static final String TEST_DIRECTORY_KEY = "test.build.data";
@@ -98,6 +98,36 @@ public class HBaseTestingUtility {
   }
 
   /**
+   * Home our cluster in a dir under build/test.  Give it a random name
+   * so can have many concurrent clusters running if we need to.  Need to
+   * amend the test.build.data System property.  Its what minidfscluster bases
+   * it data dir on.  Moding a System property is not the way to do concurrent
+   * instances -- another instance could grab the temporary
+   * value unintentionally -- but not anything can do about it at moment; its
+   * how the minidfscluster works.
+   * @return The calculated cluster test build directory.
+   */
+  File setupClusterTestBuildDir() {
+    String oldTestBuildDir =
+      System.getProperty(TEST_DIRECTORY_KEY, "build/test/data");
+    String randomStr = UUID.randomUUID().toString();
+    String dirStr = oldTestBuildDir + "." + randomStr;
+    File dir = new File(dirStr).getAbsoluteFile();
+    // Have it cleaned up on exit
+    dir.deleteOnExit();
+    return dir;
+  }
+
+  /**
+   * @throws IOException If cluster already running.
+   */
+  void isRunningCluster() throws IOException {
+    if (this.clusterTestBuildDir == null) return;
+    throw new IOException("Cluster already running at " +
+      this.clusterTestBuildDir);
+  }
+
+  /**
    * @param subdirName
    * @return Path to a subdirectory named <code>subdirName</code> under
    * {@link #getTestDir()}.
@@ -114,16 +144,35 @@ public class HBaseTestingUtility {
     startMiniCluster(1);
   }
 
+  /**
+   * Call this if you only want a zk cluster.
+   * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
+   * @throws Exception
+   * @see #shutdownMiniZKCluster() 
+   */
   public void startMiniZKCluster() throws Exception {
-    // Note that this is done before we create the MiniHBaseCluster because we
-    // need to edit the config to add the ZooKeeper servers.
+    isRunningCluster();
+    this.clusterTestBuildDir = setupClusterTestBuildDir();
+    startMiniZKCluster(this.clusterTestBuildDir);
+
+  }
+
+  private void startMiniZKCluster(final File dir) throws Exception {
     this.zkCluster = new MiniZooKeeperCluster();
-    int clientPort = this.zkCluster.startup(this.clusterTestBuildDir);
+    int clientPort = this.zkCluster.startup(dir);
     this.conf.set("hbase.zookeeper.property.clientPort",
       Integer.toString(clientPort));
   }
 
   /**
+   * @throws IOException
+   * @see #startMiniZKCluster()
+   */
+  public void shutdownMiniZKCluster() throws IOException {
+    if (this.zkCluster != null) this.zkCluster.shutdown(); 
+  }
+
+  /**
    * Start up a minicluster of hbase, optinally dfs, and zookeeper.
    * Modifies Configuration.  Homes the cluster data directory under a random
    * subdirectory in a directory under System property test.build.data.
@@ -138,27 +187,13 @@ public class HBaseTestingUtility {
   throws Exception {
     LOG.info("Starting up minicluster");
     // If we already put up a cluster, fail.
-    if (this.clusterTestBuildDir != null) {
-      throw new IOException("Cluster already running at " +
-        this.clusterTestBuildDir);
-    }
-    // Now, home our cluster in a dir under build/test.  Give it a random name
-    // so can have many concurrent clusters running if we need to.  Need to
-    // amend the test.build.data System property.  Its what minidfscluster bases
-    // it data dir on.  Moding a System property is not the way to do concurrent
-    // instances -- another instance could grab the temporary
-    // value unintentionally -- but not anything can do about it at moment; its
-    // how the minidfscluster works.
-    String oldTestBuildDir =
+    isRunningCluster();
+    String oldBuildTestDir =
       System.getProperty(TEST_DIRECTORY_KEY, "build/test/data");
-    String randomStr = UUID.randomUUID().toString();
-    String clusterTestBuildDirStr = oldTestBuildDir + "." + randomStr;
-    this.clusterTestBuildDir =
-      new File(clusterTestBuildDirStr).getAbsoluteFile();
-    // Have it cleaned up on exit
-    this.clusterTestBuildDir.deleteOnExit();
+    this.clusterTestBuildDir = setupClusterTestBuildDir();
+
     // Set our random dir while minidfscluster is being constructed.
-    System.setProperty(TEST_DIRECTORY_KEY, clusterTestBuildDirStr);
+    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath());
     // Bring up mini dfs cluster. This spews a bunch of warnings about missing
     // scheme. TODO: fix.
     // Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
@@ -167,7 +202,8 @@ public class HBaseTestingUtility {
     // Restore System property. minidfscluster accesses content of
     // the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using,
     // but otherwise, just in constructor.
-    System.setProperty(TEST_DIRECTORY_KEY, oldTestBuildDir);
+    System.setProperty(TEST_DIRECTORY_KEY, oldBuildTestDir);
+ 
     // Mangle conf so fs parameter points to minidfs we just started up
     FileSystem fs = this.dfsCluster.getFileSystem();
     this.conf.set("fs.defaultFS", fs.getUri().toString());
@@ -175,7 +211,7 @@ public class HBaseTestingUtility {
 
     // It could be created before the cluster
     if(this.zkCluster == null) {
-      startMiniZKCluster();
+      startMiniZKCluster(this.clusterTestBuildDir);
     }
 
     // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
@@ -192,8 +228,17 @@ public class HBaseTestingUtility {
   }
 
   /**
+   * @return Current mini hbase cluster. Only has something in it after a call
+   * to {@link #startMiniCluster()}.
+   * @see #startMiniCluster()
+   */
+  public MiniHBaseCluster getMiniHBaseCluster() {
+    return this.hbaseCluster;
+  }
+
+  /**
    * @throws IOException
-   * @see {@link #startMiniCluster(boolean, int)}
+   * @see {@link #startMiniCluster(int)}
    */
   public void shutdownMiniCluster() throws IOException {
     LOG.info("Shutting down minicluster");
@@ -202,7 +247,7 @@ public class HBaseTestingUtility {
       // Wait till hbase is down before going on to shutdown zk.
       this.hbaseCluster.join();
     }
-    if (this.zkCluster != null) this.zkCluster.shutdown();
+    shutdownMiniZKCluster();
     if (this.dfsCluster != null) {
       // The below throws an exception per dn, AsynchronousCloseException.
       this.dfsCluster.shutdown();
@@ -369,9 +414,24 @@ public class HBaseTestingUtility {
    * 
    * @param table  The table to use for the data.
    * @param columnFamily  The family to insert the data into.
+   * @return count of regions created.
+   * @throws IOException When creating the regions fails.
+   */
+  public int createMultiRegions(HTable table, byte[] columnFamily) 
+  throws IOException {
+    return createMultiRegions(getConfiguration(), table, columnFamily);
+  }
+
+  /**
+   * Creates many regions names "aaa" to "zzz".
+   * @param c Configuration to use.
+   * @param table  The table to use for the data.
+   * @param columnFamily  The family to insert the data into.
+   * @return count of regions created.
    * @throws IOException When creating the regions fails.
    */
-  public void createMultiRegions(HTable table, byte[] columnFamily) 
+  public int createMultiRegions(final Configuration c, final HTable table,
+      final byte[] columnFamily) 
   throws IOException {
     byte[][] KEYS = {
       HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
@@ -385,7 +445,6 @@ public class HBaseTestingUtility {
       Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
     };
 
-    Configuration c = getConfiguration();
     HTable meta = new HTable(c, HConstants.META_TABLE_NAME);
     HTableDescriptor htd = table.getTableDescriptor();
     if(!htd.hasFamily(columnFamily)) {
@@ -398,6 +457,7 @@ public class HBaseTestingUtility {
     // including the new start region from empty to "bbb". lg 
     List<byte[]> rows = getMetaTableRows();
     // add custom ones
+    int count = 0;
     for (int i = 0; i < KEYS.length; i++) {
       int j = (i + 1) % KEYS.length;
       HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(), 
@@ -407,6 +467,7 @@ public class HBaseTestingUtility {
         Writables.getBytes(hri));
       meta.put(put);
       LOG.info("createMultiRegions: inserted " + hri.toString());
+      count++;
     }
     // see comment above, remove "old" (or previous) single region
     for (byte[] row : rows) {
@@ -417,6 +478,7 @@ public class HBaseTestingUtility {
     // flush cache of regions
     HConnection conn = table.getConnection();
     conn.clearRegionCache();
+    return count;
   }
 
   /**
@@ -621,4 +683,4 @@ public class HBaseTestingUtility {
   public MiniDFSCluster getDFSCluster() {
     return dfsCluster;
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Fri Apr 30 06:52:27 2010
@@ -21,7 +21,10 @@ package org.apache.hadoop.hbase;
 
 import java.io.IOException;
 import java.net.BindException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.client.HC
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 
 /**
  * This class creates a single process HBase cluster. One thread is run for
@@ -46,7 +50,7 @@ public class MiniHBaseCluster implements
 
   /**
    * Start a MiniHBaseCluster. 
-   * @param conf HBaseConfiguration to be used for cluster
+   * @param conf Configuration to be used for cluster
    * @param numRegionServers initial number of region servers to start.
    * @throws IOException
    */
@@ -56,12 +60,62 @@ public class MiniHBaseCluster implements
     init(numRegionServers);
   }
 
+  /**
+   * Override Master so can add inject behaviors testing.
+   */
+  public static class MiniHBaseClusterMaster extends HMaster {
+    private final Map<HServerInfo, List<HMsg>> messages =
+      new ConcurrentHashMap<HServerInfo, List<HMsg>>();
+
+    public MiniHBaseClusterMaster(final Configuration conf)
+    throws IOException {
+      super(conf);
+    }
+
+    /**
+     * Add a message to send to a regionserver next time it checks in.
+     * @param hsi RegionServer's HServerInfo.
+     * @param msg Message to add.
+     */
+    void addMessage(final HServerInfo hsi, HMsg msg) {
+      synchronized(this.messages) {
+        List<HMsg> hmsgs = this.messages.get(hsi);
+        if (hmsgs == null) {
+          hmsgs = new ArrayList<HMsg>();
+          this.messages.put(hsi, hmsgs);
+        }
+        hmsgs.add(msg);
+      }
+    }
+
+    @Override
+    protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi,
+        final HMsg[] msgs) {
+      HMsg [] answerMsgs = msgs;
+      synchronized (this.messages) {
+        List<HMsg> hmsgs = this.messages.get(hsi);
+        if (hmsgs != null && !hmsgs.isEmpty()) {
+          int size = answerMsgs.length;
+          HMsg [] newAnswerMsgs = new HMsg[size + hmsgs.size()];
+          System.arraycopy(answerMsgs, 0, newAnswerMsgs, 0, answerMsgs.length);
+          for (int i = 0; i < hmsgs.size(); i++) {
+            newAnswerMsgs[answerMsgs.length + i] = hmsgs.get(i);
+          }
+          answerMsgs = newAnswerMsgs;
+          hmsgs.clear();
+        }
+      }
+      return super.adornRegionServerAnswer(hsi, answerMsgs);
+    }
+  }
+
   private void init(final int nRegionNodes) throws IOException {
     try {
       // start up a LocalHBaseCluster
       while (true) {
         try {
-          hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes);
+          hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes,
+              MiniHBaseCluster.MiniHBaseClusterMaster.class);
           hbaseCluster.startup();
         } catch (BindException e) {
           //this port is already in use. try to use another (for multiple testing)
@@ -86,8 +140,7 @@ public class MiniHBaseCluster implements
    * @return Name of regionserver started.
    */
   public String startRegionServer() throws IOException {
-    LocalHBaseCluster.RegionServerThread t =
-      this.hbaseCluster.addRegionServer();
+    JVMClusterUtil.RegionServerThread t = this.hbaseCluster.addRegionServer();
     t.start();
     t.waitForServerOnline();
     return t.getName();
@@ -109,18 +162,18 @@ public class MiniHBaseCluster implements
   }
 
   /**
-   * Cause a region server to exit without cleaning up
-   *
+   * Cause a region server to exit doing basic clean up only on its way out.
    * @param serverNumber  Used as index into a list.
    */
-  public void abortRegionServer(int serverNumber) {
+  public String abortRegionServer(int serverNumber) {
     HRegionServer server = getRegionServer(serverNumber);
-    try {
-      LOG.info("Aborting " + server.getHServerInfo().toString());
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
+    /*TODO: Prove not needed in TRUNK
+    // // Don't run hdfs shutdown thread.
+    // server.setHDFSShutdownThreadOnExit(null);
+    */
+    LOG.info("Aborting " + server.toString());
     server.abort();
+    return server.toString();
   }
 
   /**
@@ -129,7 +182,7 @@ public class MiniHBaseCluster implements
    * @param serverNumber  Used as index into a list.
    * @return the region server that was stopped
    */
-  public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber) {
+  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
     return stopRegionServer(serverNumber, true);
   }
 
@@ -143,9 +196,9 @@ public class MiniHBaseCluster implements
    * before end of the test.
    * @return the region server that was stopped
    */
-  public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber,
+  public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
       final boolean shutdownFS) {
-    LocalHBaseCluster.RegionServerThread server =
+    JVMClusterUtil.RegionServerThread server =
       hbaseCluster.getRegionServers().get(serverNumber);
     LOG.info("Stopping " + server.toString());
     if (!shutdownFS) {
@@ -157,8 +210,8 @@ public class MiniHBaseCluster implements
   }
 
   /**
-   * Wait for the specified region server to stop
-   * Removes this thread from list of running threads.
+   * Wait for the specified region server to stop. Removes this thread from list
+   * of running threads.
    * @param serverNumber
    * @return Name of region server that just went down.
    */
@@ -189,7 +242,7 @@ public class MiniHBaseCluster implements
    * @throws IOException
    */
   public void flushcache() throws IOException {
-    for (LocalHBaseCluster.RegionServerThread t:
+    for (JVMClusterUtil.RegionServerThread t:
         this.hbaseCluster.getRegionServers()) {
       for(HRegion r: t.getRegionServer().getOnlineRegions()) {
         r.flushcache();
@@ -200,7 +253,7 @@ public class MiniHBaseCluster implements
   /**
    * @return List of region server threads.
    */
-  public List<LocalHBaseCluster.RegionServerThread> getRegionThreads() {
+  public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
     return this.hbaseCluster.getRegionServers();
   }
 
@@ -212,4 +265,38 @@ public class MiniHBaseCluster implements
   public HRegionServer getRegionServer(int serverNumber) {
     return hbaseCluster.getRegionServer(serverNumber);
   }
-}
\ No newline at end of file
+
+  /**
+   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
+   * of HRS carrying .META.  Returns -1 if none found.
+   */
+  public int getServerWithMeta() {
+    int index = -1;
+    int count = 0;
+    for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
+      HRegionServer hrs = rst.getRegionServer();
+      HRegion metaRegion =
+        hrs.getOnlineRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
+      if (metaRegion != null) {
+        index = count;
+        break;
+      }
+      count++;
+    }
+    return index;
+  }
+
+  /**
+   * Add a message to include in the responses send a regionserver when it
+   * checks back in.
+   * @param serverNumber Which server to send it to.
+   * @param msg The MESSAGE
+   * @throws IOException
+   */
+  public void addMessageToSendRegionServer(final int serverNumber,
+    final HMsg msg)
+  throws IOException {
+    HRegionServer hrs = getRegionServer(serverNumber);
+    ((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg);
+  }
+}

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestInfoServers.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestInfoServers.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestInfoServers.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestInfoServers.java Fri Apr 30 06:52:27 2010
@@ -51,7 +51,7 @@ public class TestInfoServers extends HBa
     int port = cluster.getMaster().getInfoServer().getPort();
     assertHasExpectedContent(new URL("http://localhost:" + port +
       "/index.html"), "master");
-    port = cluster.getRegionThreads().get(0).getRegionServer().
+    port = cluster.getRegionServerThreads().get(0).getRegionServer().
       getInfoServer().getPort();
     assertHasExpectedContent(new URL("http://localhost:" + port +
       "/index.html"), "regionserver");

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java?rev=939567&r1=939566&r2=939567&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java Fri Apr 30 06:52:27 2010
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 
 /**
  * Test whether region rebalancing works. (HBASE-71)
@@ -194,7 +195,7 @@ public class TestRegionRebalancing exten
   
   private List<HRegionServer> getOnlineRegionServers() {
     List<HRegionServer> list = new ArrayList<HRegionServer>();
-    for (LocalHBaseCluster.RegionServerThread rst : cluster.getRegionThreads()) {
+    for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) {
       if (rst.getRegionServer().isOnline()) {
         list.add(rst.getRegionServer());
       }

Added: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransistions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransistions.java?rev=939567&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransistions.java (added)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransistions.java Fri Apr 30 06:52:27 2010
@@ -0,0 +1,262 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test transitions of state across the master.
+ */
+public class TestMasterTransistions {
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final String TABLENAME = "master_transitions";
+  private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
+    Bytes.toBytes("b"), Bytes.toBytes("c")};
+
+  /**
+   * Start up a mini cluster and put a small table of many empty regions into it.
+   * @throws Exception
+   */
+  @BeforeClass public static void beforeAllTests() throws Exception {
+    // Start a cluster of two regionservers.
+    TEST_UTIL.startMiniCluster(2);
+    // Create a table of three families.  This will assign a region.
+    TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES);
+    HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
+    int countOfRegions = TEST_UTIL.createMultiRegions(t, FAMILIES[0]);
+    waitUntilAllRegionsAssigned(countOfRegions);
+  }
+
+  @AfterClass public static void afterAllTests() throws IOException {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Listener for regionserver events testing hbase-2428 (Infinite loop of
+   * region closes if META region is offline).  In particular, listen
+   * for the close of the 'metaServer' and when it comes in, requeue it with a
+   * delay as though there were an issue processing the shutdown.  As part of
+   * the requeuing,  send over a close of a region on 'otherServer' so it comes
+   * into a master that has its meta region marked as offline.
+   */
+  static class HBase2428Listener implements RegionServerOperationListener {
+    // Map of what we've delayed so we don't do do repeated delays.
+    private final Set<RegionServerOperation> postponed =
+      new CopyOnWriteArraySet<RegionServerOperation>();
+    private boolean done = false;;
+    private boolean metaShutdownReceived = false;
+    private final HServerAddress metaAddress;
+    private final MiniHBaseCluster cluster;
+    private final int otherServerIndex;
+    private final HRegionInfo hri;
+    private int closeCount = 0;
+    static final int SERVER_DURATION = 10 * 1000;
+    static final int CLOSE_DURATION = 1 * 1000;
+ 
+    HBase2428Listener(final MiniHBaseCluster c, final HServerAddress metaAddress,
+        final HRegionInfo closingHRI, final int otherServerIndex) {
+      this.cluster = c;
+      this.metaAddress = metaAddress;
+      this.hri = closingHRI;
+      this.otherServerIndex = otherServerIndex;
+    }
+
+    @Override
+    public boolean process(final RegionServerOperation op) throws IOException {
+      // If a regionserver shutdown and its of the meta server, then we want to
+      // delay the processing of the shutdown and send off a close of a region on
+      // the 'otherServer.
+      boolean result = true;
+      if (op instanceof ProcessServerShutdown) {
+        ProcessServerShutdown pss = (ProcessServerShutdown)op;
+        if (pss.getDeadServerAddress().equals(this.metaAddress)) {
+          // Don't postpone more than once.
+          if (!this.postponed.contains(pss)) {
+            // Close some region.
+            this.cluster.addMessageToSendRegionServer(this.otherServerIndex,
+              new HMsg(HMsg.Type.MSG_REGION_CLOSE, hri,
+              Bytes.toBytes("Forcing close in test")));
+            this.postponed.add(pss);
+            // Put off the processing of the regionserver shutdown processing.
+            pss.setDelay(SERVER_DURATION);
+            this.metaShutdownReceived = true;
+            // Return false.  This will add this op to the delayed queue.
+            result = false;
+          }
+        }
+      } else {
+        // Have the close run frequently.
+        if (isWantedCloseOperation(op) != null) {
+          op.setDelay(CLOSE_DURATION);
+          // Count how many times it comes through here.
+          this.closeCount++;
+        }
+      }
+      return result;
+    }
+
+    public void processed(final RegionServerOperation op) {
+      if (isWantedCloseOperation(op) == null) return;
+      this.done = true;
+    }
+
+    /*
+     * @param op
+     * @return Null if not the wanted ProcessRegionClose, else <code>op</code>
+     * cast as a ProcessRegionClose.
+     */
+    private ProcessRegionClose isWantedCloseOperation(final RegionServerOperation op) {
+      // Count every time we get a close operation.
+      if (op instanceof ProcessRegionClose) {
+        ProcessRegionClose c = (ProcessRegionClose)op;
+        if (c.regionInfo.equals(hri)) {
+          return c;
+        }
+      }
+      return null;
+    }
+
+    boolean isDone() {
+      return this.done;
+    }
+
+    boolean isMetaShutdownReceived() {
+      return metaShutdownReceived;
+    }
+
+    int getCloseCount() {
+      return this.closeCount;
+    }
+  }
+
+  /**
+   * In 2428, the meta region has just been set offline and then a close comes
+   * in.
+   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2428">HBASE-2428</a> 
+   */
+  @Test public void testRegionCloseWhenNoMetaHBase2428() throws Exception {
+    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    final HMaster master = cluster.getMaster();
+    int metaIndex = cluster.getServerWithMeta();
+    // Figure the index of the server that is not server the .META.
+    int otherServerIndex = -1;
+    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
+      if (i == metaIndex) continue;
+      otherServerIndex = i;
+      break;
+    }
+    final HRegionServer otherServer = cluster.getRegionServer(otherServerIndex);
+    final HRegionServer metaHRS = cluster.getRegionServer(metaIndex);
+
+    // Get a region out on the otherServer.
+    final HRegionInfo hri =
+      otherServer.getOnlineRegions().iterator().next().getRegionInfo();
+ 
+    // Add our ReionServerOperationsListener
+    HBase2428Listener listener = new HBase2428Listener(cluster,
+      metaHRS.getHServerInfo().getServerAddress(), hri, otherServerIndex);
+    master.getRegionServerOperationQueue().
+      registerRegionServerOperationListener(listener);
+    try {
+      // Now close the server carrying index.
+      cluster.abortRegionServer(metaIndex);
+
+      // First wait on receipt of meta server shutdown message.
+      while(!listener.metaShutdownReceived) Threads.sleep(100);
+      while(!listener.isDone()) Threads.sleep(10);
+      // We should not have retried the close more times than it took for the
+      // server shutdown message to exit the delay queue and get processed
+      // (Multiple by two to add in some slop in case of GC or something).
+      assertTrue(listener.getCloseCount() <
+        ((HBase2428Listener.SERVER_DURATION/HBase2428Listener.CLOSE_DURATION) * 2));
+
+      assertClosedRegionIsBackOnline(hri);
+    } finally {
+      master.getRegionServerOperationQueue().
+        unregisterRegionServerOperationListener(listener);
+    }
+  }
+
+  private void assertClosedRegionIsBackOnline(final HRegionInfo hri)
+  throws IOException {
+    // When we get here, region should be successfully deployed. Assert so.
+    // 'aaa' is safe as first row if startkey is EMPTY_BYTE_ARRAY because we
+    // loaded with HBaseTestingUtility#createMultiRegions.
+    byte [] row = Bytes.equals(HConstants.EMPTY_BYTE_ARRAY, hri.getStartKey())?
+      new byte [] {'a', 'a', 'a'}: hri.getStartKey();
+    Put p = new Put(row);
+    p.add(FAMILIES[0], FAMILIES[0], FAMILIES[0]);
+    HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
+    t.put(p);
+    Get g =  new Get(row);
+    assertTrue((t.get(g)).size() > 0);
+  }
+
+  /*
+   * Wait until all rows in .META. have a non-empty info:server.  This means
+   * all regions have been deployed, master has been informed and updated
+   * .META. with the regions deployed server.
+   * @param countOfRegions How many regions in .META.
+   * @throws IOException
+   */
+  private static void waitUntilAllRegionsAssigned(final int countOfRegions)
+  throws IOException {
+    HTable meta = new HTable(TEST_UTIL.getConfiguration(),
+      HConstants.META_TABLE_NAME);
+    while (true) {
+      int rows = 0;
+      Scan scan = new Scan();
+      scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
+      ResultScanner s = meta.getScanner(scan);
+      for (Result r = null; (r = s.next()) != null;) {
+        byte [] b =
+          r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
+        if (b == null || b.length <= 0) break;
+        rows++;
+      }
+      s.close();
+      // If I got to hear and all rows have a Server, then all have been assigned.
+      if (rows == countOfRegions) break;
+    }
+  }
+}



Mime
View raw message