hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r597959 [1/2] - in /lucene/hadoop/trunk/src/contrib/hbase: ./ conf/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/util/ src/test/ src/test/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/mapred/
Date Sun, 25 Nov 2007 07:17:42 GMT
Author: jimk
Date: Sat Nov 24 23:17:38 2007
New Revision: 597959

URL: http://svn.apache.org/viewvc?rev=597959&view=rev
Log:
HADOOP-2139 (phase 2) Make region server more event driven

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/hbase-site.xml
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=597959&r1=597958&r2=597959&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Nov 24 23:17:38 2007
@@ -42,6 +42,7 @@
     HADOOP-2176 Htable.deleteAll documentation is ambiguous
     HADOOP-2139 (phase 1) Increase parallelism in region servers.
     HADOOP-2267 [Hbase Shell] Change the prompt's title from 'hbase' to 'hql'.
+    HADOOP-2139 (phase 2) Make region server more event driven
 
 Release 0.15.1
 Branch 0.15

Modified: lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml?rev=597959&r1=597958&r2=597959&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml Sat Nov 24 23:17:38 2007
@@ -144,6 +144,14 @@
     </description>
   </property>
   <property>
+    <name>hbase.regionserver.optionalcacheflushinterval</name>
+    <value>60000</value>
+    <description>
+    Amount of time to wait since the last time a region was flushed before
+    invoking an optional cache flush. Default 60,000.
+    </description>
+  </property>
+  <property>
     <name>hbase.hregion.memcache.flush.size</name>
     <value>16777216</value>
     <description>

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java?rev=597959&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java Sat Nov 24 23:17:38 2007
@@ -0,0 +1,36 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+/**
+ * Implementors of this interface want to be notified when an HRegion
+ * determines that a cache flush is needed. A CacheFlushListener (or null)
+ * must be passed to the HRegion constructor.
+ */
+public interface CacheFlushListener {
+
+  /**
+   * Tell the listener the cache needs to be flushed.
+   * 
+   * @param region the HRegion requesting the cache flush
+   */
+  void flushRequested(HRegion region);
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=597959&r1=597958&r2=597959&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Sat Nov 24 23:17:38 2007
@@ -22,10 +22,13 @@
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -89,7 +92,9 @@
   final FileSystem fs;
   final Path dir;
   final Configuration conf;
+  final LogRollListener listener;
   final long threadWakeFrequency;
+  private final int maxlogentries;
 
   /*
    * Current log file.
@@ -99,12 +104,13 @@
   /*
    * Map of all log files but the current one. 
    */
-  final TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
+  final SortedMap<Long, Path> outputfiles = 
+    Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
 
   /*
    * Map of region to last sequence/edit id. 
    */
-  final Map<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
+  final Map<Text, Long> lastSeqWritten = new ConcurrentHashMap<Text, Long>();
 
   volatile boolean closed = false;
 
@@ -119,6 +125,10 @@
   // synchronized is insufficient because a cache flush spans two method calls.
   private final Lock cacheFlushLock = new ReentrantLock();
 
+  // We synchronize on updateLock to prevent updates and to prevent a log roll
+  // during an update
+  private final Integer updateLock = new Integer(0);
+
   /**
    * Split up a bunch of log files, that are no longer being written to, into
    * new files, one per region. Delete the old log files when finished.
@@ -207,12 +217,15 @@
    * @param conf
    * @throws IOException
    */
-  HLog(final FileSystem fs, final Path dir, final Configuration conf)
-  throws IOException {
+  HLog(final FileSystem fs, final Path dir, final Configuration conf,
+      final LogRollListener listener) throws IOException {
     this.fs = fs;
     this.dir = dir;
     this.conf = conf;
+    this.listener = listener;
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.maxlogentries =
+      conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
     if (fs.exists(dir)) {
       throw new IOException("Target HLog directory already exists: " + dir);
     }
@@ -256,98 +269,82 @@
    *
    * @throws IOException
    */
-  synchronized void rollWriter() throws IOException {
-    boolean locked = false;
-    while (!locked && !closed) {
-      if (this.cacheFlushLock.tryLock()) {
-        locked = true;
-        break;
-      }
-      try {
-        this.wait(threadWakeFrequency);
-      } catch (InterruptedException e) {
-        // continue
-      }
-    }
-    if (closed) {
-      if (locked) {
-        this.cacheFlushLock.unlock();
-      }
-      throw new IOException("Cannot roll log; log is closed");
-    }
-
-    // If we get here we have locked out both cache flushes and appends
+  void rollWriter() throws IOException {
+    this.cacheFlushLock.lock();
     try {
-      if (this.writer != null) {
-        // Close the current writer, get a new one.
-        this.writer.close();
-        Path p = computeFilename(filenum - 1);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Closing current log writer " + p.toString() +
+      if (closed) {
+        return;
+      }
+      synchronized (updateLock) {
+        if (this.writer != null) {
+          // Close the current writer, get a new one.
+          this.writer.close();
+          Path p = computeFilename(filenum - 1);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Closing current log writer " + p.toString() +
             " to get a new one");
-        }
-        if (filenum > 0) {
-          synchronized (this.sequenceLock) {
-            this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p);
           }
-        }
-      }
-      Path newPath = computeFilename(filenum++);
-      this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
-          HLogKey.class, HLogEdit.class);
-      LOG.info("new log writer created at " + newPath);
-
-      // Can we delete any of the old log files?
-      if (this.outputfiles.size() > 0) {
-        if (this.lastSeqWritten.size() <= 0) {
-          LOG.debug("Last sequence written is empty. Deleting all old hlogs");
-          // If so, then no new writes have come in since all regions were
-          // flushed (and removed from the lastSeqWritten map). Means can
-          // remove all but currently open log file.
-          for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
-            deleteLogFile(e.getValue(), e.getKey());
+          if (filenum > 0) {
+            synchronized (this.sequenceLock) {
+              this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p);
+            }
           }
-          this.outputfiles.clear();
-        } else {
-          // Get oldest edit/sequence id.  If logs are older than this id,
-          // then safe to remove.
-          TreeSet<Long> sequenceNumbers =
-            new TreeSet<Long>(this.lastSeqWritten.values());
-          long oldestOutstandingSeqNum = sequenceNumbers.first().longValue();
-          // Get the set of all log files whose final ID is older than the
-          // oldest pending region operation
-          sequenceNumbers.clear();
-          sequenceNumbers.addAll(this.outputfiles.headMap(
-              Long.valueOf(oldestOutstandingSeqNum)).keySet());
-          // Now remove old log files (if any)
-          if (LOG.isDebugEnabled()) {
-            // Find region associated with oldest key -- helps debugging.
-            Text oldestRegion = null;
-            for (Map.Entry<Text, Long> e: this.lastSeqWritten.entrySet()) {
-              if (e.getValue().longValue() == oldestOutstandingSeqNum) {
-                oldestRegion = e.getKey();
-                break;
+        }
+        Path newPath = computeFilename(filenum++);
+        this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
+            HLogKey.class, HLogEdit.class);
+        LOG.info("new log writer created at " + newPath);
+
+        // Can we delete any of the old log files?
+        if (this.outputfiles.size() > 0) {
+          if (this.lastSeqWritten.size() <= 0) {
+            LOG.debug("Last sequence written is empty. Deleting all old hlogs");
+            // If so, then no new writes have come in since all regions were
+            // flushed (and removed from the lastSeqWritten map). Means can
+            // remove all but currently open log file.
+            for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
+              deleteLogFile(e.getValue(), e.getKey());
+            }
+            this.outputfiles.clear();
+          } else {
+            // Get oldest edit/sequence id.  If logs are older than this id,
+            // then safe to remove.
+            Long oldestOutstandingSeqNum =
+              Collections.min(this.lastSeqWritten.values());
+            // Get the set of all log files whose final ID is older than or
+            // equal to the oldest pending region operation
+            TreeSet<Long> sequenceNumbers =
+              new TreeSet<Long>(this.outputfiles.headMap(
+                (oldestOutstandingSeqNum + Long.valueOf(1L))).keySet());
+            // Now remove old log files (if any)
+            if (LOG.isDebugEnabled()) {
+              // Find region associated with oldest key -- helps debugging.
+              Text oldestRegion = null;
+              for (Map.Entry<Text, Long> e: this.lastSeqWritten.entrySet()) {
+                if (e.getValue().longValue() == oldestOutstandingSeqNum) {
+                  oldestRegion = e.getKey();
+                  break;
+                }
               }
+              LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
+                  "using oldest outstanding seqnum of " +
+                  oldestOutstandingSeqNum + " from region " + oldestRegion);
             }
-            LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
-              "using oldest outstanding seqnum of " + oldestOutstandingSeqNum +
-              " from region " + oldestRegion);
-          }
-          if (sequenceNumbers.size() > 0) {
-            for (Long seq : sequenceNumbers) {
-              deleteLogFile(this.outputfiles.remove(seq), seq);
+            if (sequenceNumbers.size() > 0) {
+              for (Long seq : sequenceNumbers) {
+                deleteLogFile(this.outputfiles.remove(seq), seq);
+              }
             }
           }
         }
+        this.numEntries = 0;
       }
-      this.numEntries = 0;
     } finally {
       this.cacheFlushLock.unlock();
     }
   }
   
-  private void deleteLogFile(final Path p, final Long seqno)
-  throws IOException {
+  private void deleteLogFile(final Path p, final Long seqno) throws IOException {
     LOG.info("removing old log file " + p.toString() +
       " whose highest sequence/edit id is " + seqno);
     this.fs.delete(p);
@@ -367,7 +364,7 @@
    *
    * @throws IOException
    */
-  synchronized void closeAndDelete() throws IOException {
+  void closeAndDelete() throws IOException {
     close();
     fs.delete(dir);
   }
@@ -377,12 +374,19 @@
    *
    * @throws IOException
    */
-  synchronized void close() throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("closing log writer in " + this.dir.toString());
+  void close() throws IOException {
+    cacheFlushLock.lock();
+    try {
+      synchronized (updateLock) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("closing log writer in " + this.dir.toString());
+        }
+        this.writer.close();
+        this.closed = true;
+      }
+    } finally {
+      cacheFlushLock.unlock();
     }
-    this.writer.close();
-    this.closed = true;
   }
 
   /**
@@ -409,29 +413,36 @@
    * @param timestamp
    * @throws IOException
    */
-  synchronized void append(Text regionName, Text tableName,
+  void append(Text regionName, Text tableName,
       TreeMap<HStoreKey, byte[]> edits) throws IOException {
     
     if (closed) {
       throw new IOException("Cannot append; log is closed");
     }
-    long seqNum[] = obtainSeqNum(edits.size());
-    // The 'lastSeqWritten' map holds the sequence number of the oldest
-    // write for each region. When the cache is flushed, the entry for the
-    // region being flushed is removed if the sequence number of the flush
-    // is greater than or equal to the value in lastSeqWritten.
-    if (!this.lastSeqWritten.containsKey(regionName)) {
-      this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
-    }
-    int counter = 0;
-    for (Map.Entry<HStoreKey, byte[]> es : edits.entrySet()) {
-      HStoreKey key = es.getKey();
-      HLogKey logKey =
-        new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
-      HLogEdit logEdit =
-        new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
-      this.writer.append(logKey, logEdit);
-      this.numEntries++;
+    synchronized (updateLock) {
+      long seqNum[] = obtainSeqNum(edits.size());
+      // The 'lastSeqWritten' map holds the sequence number of the oldest
+      // write for each region. When the cache is flushed, the entry for the
+      // region being flushed is removed if the sequence number of the flush
+      // is greater than or equal to the value in lastSeqWritten.
+      if (!this.lastSeqWritten.containsKey(regionName)) {
+        this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
+      }
+      int counter = 0;
+      for (Map.Entry<HStoreKey, byte[]> es : edits.entrySet()) {
+        HStoreKey key = es.getKey();
+        HLogKey logKey =
+          new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
+        HLogEdit logEdit =
+          new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
+        this.writer.append(logKey, logEdit);
+        this.numEntries++;
+      }
+    }
+    if (this.numEntries > this.maxlogentries) {
+      if (listener != null) {
+        listener.logRollRequested();
+      }
     }
   }
 
@@ -451,6 +462,11 @@
     return value;
   }
 
+  /** @return the number of log files in use */
+  int getNumLogFiles() {
+    return outputfiles.size();
+  }
+
   /**
    * Obtain a specified number of sequence numbers
    *
@@ -487,43 +503,43 @@
   /**
    * Complete the cache flush
    *
-   * Protected by this and cacheFlushLock
+   * Protected by cacheFlushLock
    *
    * @param regionName
    * @param tableName
    * @param logSeqId
    * @throws IOException
    */
-  synchronized void completeCacheFlush(final Text regionName,
-    final Text tableName, final long logSeqId)
-  throws IOException {
+  void completeCacheFlush(final Text regionName, final Text tableName,
+      final long logSeqId) throws IOException {
+
     try {
       if (this.closed) {
         return;
       }
-      this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
-        new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(),
-          System.currentTimeMillis()));
-      this.numEntries++;
-      Long seq = this.lastSeqWritten.get(regionName);
-      if (seq != null && logSeqId >= seq.longValue()) {
-        this.lastSeqWritten.remove(regionName);
+      synchronized (updateLock) {
+        this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
+            new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(),
+                System.currentTimeMillis()));
+        this.numEntries++;
+        Long seq = this.lastSeqWritten.get(regionName);
+        if (seq != null && logSeqId >= seq.longValue()) {
+          this.lastSeqWritten.remove(regionName);
+        }
       }
     } finally {
       this.cacheFlushLock.unlock();
-      notifyAll();              // wake up the log roller if it is waiting
     }
   }
 
   /**
-   * Abort a cache flush. This method will clear waits on
-   * {@link #insideCacheFlush}. Call if the flush fails. Note that the only
-   * recovery for an aborted flush currently is a restart of the regionserver so
-   * the snapshot content dropped by the failure gets restored to the memcache.
+   * Abort a cache flush.
+   * Call if the flush fails. Note that the only recovery for an aborted flush
+   * currently is a restart of the regionserver so the snapshot content dropped
+   * by the failure gets restored to the memcache.
    */
-  synchronized void abortCacheFlush() {
+  void abortCacheFlush() {
     this.cacheFlushLock.unlock();
-    notifyAll();
   }
 
   private static void usage() {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=597959&r1=597958&r2=597959&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Sat Nov 24 23:17:38 2007
@@ -30,6 +30,7 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TreeMap;
@@ -70,7 +71,8 @@
  * There is only one HMaster for a single HBase deployment.
  */
 public class HMaster extends Thread implements HConstants, HMasterInterface, 
-HMasterRegionInterface {
+  HMasterRegionInterface {
+  
   static final Log LOG = LogFactory.getLog(HMaster.class.getName());
 
   /** {@inheritDoc} */
@@ -100,8 +102,10 @@
   int numRetries;
   long maxRegionOpenTime;
 
-  DelayQueue<PendingServerShutdown> shutdownQueue;
-  BlockingQueue<PendingOperation> msgQueue;
+  DelayQueue<ProcessServerShutdown> shutdownQueue =
+    new DelayQueue<ProcessServerShutdown>();
+  BlockingQueue<RegionServerOperation> msgQueue =
+    new LinkedBlockingQueue<RegionServerOperation>();
 
   int leaseTimeout;
   private Leases serverLeases;
@@ -113,7 +117,7 @@
   int metaRescanInterval;
 
   final AtomicReference<HServerAddress> rootRegionLocation =
-    new AtomicReference<HServerAddress>();
+    new AtomicReference<HServerAddress>(null);
   
   Lock splitLogLock = new ReentrantLock();
   
@@ -409,88 +413,89 @@
 
     protected void checkAssigned(final HRegionInfo info,
       final String serverName, final long startCode) throws IOException {
-      // Skip region - if ...
-      if(info.isOffline()                                 // offline
-          || killedRegions.contains(info.getRegionName()) // queued for offline
-          || regionsToDelete.contains(info.getRegionName())) { // queued for delete
-        unassignedRegions.remove(info.getRegionName());
-        assignAttempts.remove(info.getRegionName());
-        return;
-      }
-      HServerInfo storedInfo = null;
-      boolean deadServer = false;
-      if (serverName.length() != 0) {
-        Map<Text, HRegionInfo> regionsToKill = killList.get(serverName);
-        if (regionsToKill != null &&
-            regionsToKill.containsKey(info.getRegionName())) {
-          
-          // Skip if region is on kill list
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("not assigning region (on kill list): " +
-                info.getRegionName());
-          }
+      
+      synchronized (serversToServerInfo) {
+        // Skip region - if ...
+        if(info.isOffline()                                 // offline
+            || killedRegions.contains(info.getRegionName()) // queued for offline
+            || regionsToDelete.contains(info.getRegionName())) { // queued for delete
+          unassignedRegions.remove(info.getRegionName());
+          assignAttempts.remove(info.getRegionName());
           return;
         }
-        synchronized (serversToServerInfo) {
+        HServerInfo storedInfo = null;
+        boolean deadServer = false;
+        if (serverName.length() != 0) {
+          Map<Text, HRegionInfo> regionsToKill = killList.get(serverName);
+          if (regionsToKill != null &&
+              regionsToKill.containsKey(info.getRegionName())) {
+
+            // Skip if region is on kill list
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("not assigning region (on kill list): " +
+                  info.getRegionName());
+            }
+            return;
+          }
           storedInfo = serversToServerInfo.get(serverName);
           if (deadServers.contains(serverName)) {
             deadServer = true;
           }
         }
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Checking " + info.getRegionName() + " is assigned");
-      }
-
-      /*
-       * If the server is not dead and either:
-       *   the stored info is not null and the start code does not match
-       * or:
-       *   the stored info is null and the region is neither unassigned nor pending
-       * then:
-       */ 
-      if (!deadServer &&
-          ((storedInfo != null && storedInfo.getStartCode() != startCode) ||
-              (storedInfo == null &&
-                  !unassignedRegions.containsKey(info.getRegionName()) &&
-                  !pendingRegions.contains(info.getRegionName())
-              )
-          )
-      ) {
-
-        // The current assignment is no good
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Current assignment of " + info.getRegionName() +
-              " is no good");
+          LOG.debug("Checking " + info.getRegionName() + " is assigned");
         }
-        // Recover the region server's log if there is one.
-        // This is only done from here if we are restarting and there is stale
-        // data in the meta region. Once we are on-line, dead server log
-        // recovery is handled by lease expiration and PendingServerShutdown
-        if (serverName.length() != 0) {
-          StringBuilder dirName = new StringBuilder("log_");
-          dirName.append(serverName.replace(":", "_"));
-          Path logDir = new Path(dir, dirName.toString());
-          try {
-            if (fs.exists(logDir)) {
-              splitLogLock.lock();
-              try {
-                HLog.splitLog(dir, logDir, fs, conf);
-              } finally {
-                splitLogLock.unlock();
+
+        /*
+         * If the server is not dead and either:
+         *   the stored info is not null and the start code does not match
+         * or:
+         *   the stored info is null and the region is neither unassigned nor pending
+         * then:
+         */ 
+        if (!deadServer &&
+            ((storedInfo != null && storedInfo.getStartCode() != startCode) ||
+                (storedInfo == null &&
+                    !unassignedRegions.containsKey(info.getRegionName()) &&
+                    !pendingRegions.contains(info.getRegionName())
+                )
+            )
+        ) {
+
+          // The current assignment is no good
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Current assignment of " + info.getRegionName() +
+            " is no good");
+          }
+          // Recover the region server's log if there is one.
+          // This is only done from here if we are restarting and there is stale
+          // data in the meta region. Once we are on-line, dead server log
+          // recovery is handled by lease expiration and ProcessServerShutdown
+          if (serverName.length() != 0) {
+            StringBuilder dirName = new StringBuilder("log_");
+            dirName.append(serverName.replace(":", "_"));
+            Path logDir = new Path(dir, dirName.toString());
+            try {
+              if (fs.exists(logDir)) {
+                splitLogLock.lock();
+                try {
+                  HLog.splitLog(dir, logDir, fs, conf);
+                } finally {
+                  splitLogLock.unlock();
+                }
               }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Split " + logDir.toString());
+              }
+            } catch (IOException e) {
+              LOG.warn("unable to split region server log because: ", e);
+              throw e;
             }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Split " + logDir.toString());
-            }
-          } catch (IOException e) {
-            LOG.warn("unable to split region server log because: ", e);
-            throw e;
           }
+          // Now get the region assigned
+          unassignedRegions.put(info.getRegionName(), info);
+          assignAttempts.put(info.getRegionName(), Long.valueOf(0L));
         }
-        // Now get the region assigned
-        unassignedRegions.put(info.getRegionName(), info);
-        assignAttempts.put(info.getRegionName(), Long.valueOf(0L));
       }
     }
   }
@@ -505,7 +510,6 @@
     }
 
     private void scanRoot() {
-      boolean succeeded = false;
       int tries = 0;
       while (!closed.get() && tries < numRetries) {
         synchronized (rootRegionLocation) {
@@ -530,7 +534,6 @@
             scanRegion(new MetaRegion(rootRegionLocation.get(),
                 HRegionInfo.rootRegionInfo.getRegionName(), null));
           }
-          succeeded = true;
           break;
         } catch (IOException e) {
           e = RemoteExceptionHandler.checkIOException(e);
@@ -554,12 +557,6 @@
         }
         sleeper.sleep();
       }
-      if (!succeeded) {
-        // We tried numretries to reach root and failed.  Is it gone. 
-        // Currently we just flounder.  Should we reallocate root? 
-        // This would be catastrophic?
-        // unassignRootRegion();
-      }
     }
 
     @Override
@@ -756,7 +753,9 @@
     @Override
     protected void maintenanceScan() {
       ArrayList<MetaRegion> regions = new ArrayList<MetaRegion>();
-      regions.addAll(onlineMetaRegions.values());
+      synchronized (onlineMetaRegions) {
+        regions.addAll(onlineMetaRegions.values());
+      }
       for (MetaRegion r: regions) {
         scanOneMetaRegion(r);
       }
@@ -801,6 +800,26 @@
   MetaScanner metaScannerThread;
   Integer metaScannerLock = new Integer(0);
 
+  /////////////////////////////////////////////////////////////////////////////
+  //
+  // Access to all of the following objects MUST be synchronized on
+  // serversToServerInfo
+  
+  /** The map of known server names to server info */
+  final Map<String, HServerInfo> serversToServerInfo =
+    new HashMap<String, HServerInfo>();
+  
+  /** Set of known dead servers */
+  final Set<String> deadServers = new HashSet<String>();
+
+  /** SortedMap server load -> Set of server names */
+  final SortedMap<HServerLoad, Set<String>> loadToServers =
+    new TreeMap<HServerLoad, Set<String>>();
+
+  /** Map of server names -> server load */
+  final Map<String, HServerLoad> serversToLoad =
+    new HashMap<String, HServerLoad>();
+
   /**
    * The 'unassignedRegions' table maps from a region name to a HRegionInfo 
    * record, which includes the region's table, its id, and its start/end keys.
@@ -812,55 +831,39 @@
    * the region has been deployed.
    */
   final SortedMap<Text, HRegionInfo> unassignedRegions =
-    Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
+    new TreeMap<Text, HRegionInfo>();
 
   /**
    * The 'assignAttempts' table maps from regions to a timestamp that indicates
    * the last time we *tried* to assign the region to a RegionServer. If the 
    * timestamp is out of date, then we can try to reassign it.
    */
-  final Map<Text, Long> assignAttempts =
-    Collections.synchronizedMap(new HashMap<Text, Long>());
+  final Map<Text, Long> assignAttempts = new HashMap<Text, Long>();
 
   /**
    * Regions that have been assigned, and the server has reported that it has
    * started serving it, but that we have not yet recorded in the meta table.
    */
-  Set<Text> pendingRegions;
+  final Set<Text> pendingRegions = new HashSet<Text>();
 
   /**
    * The 'killList' is a list of regions that are going to be closed, but not
    * reopened.
    */
-  Map<String, HashMap<Text, HRegionInfo>> killList;
+  final Map<String, HashMap<Text, HRegionInfo>> killList =
+    new HashMap<String, HashMap<Text, HRegionInfo>>();
 
   /** 'killedRegions' contains regions that are in the process of being closed */
-  Set<Text> killedRegions;
+  final Set<Text> killedRegions = new HashSet<Text>();
 
   /**
    * 'regionsToDelete' contains regions that need to be deleted, but cannot be
    * until the region server closes it
    */
-  Set<Text> regionsToDelete;
+  final Set<Text> regionsToDelete = new HashSet<Text>();
 
-  /** 
-   * The map of known server names to server info
-   * 
-   * Access to this map and loadToServers and serversToLoad must be synchronized
-   * on this object
-   */
-  final Map<String, HServerInfo> serversToServerInfo =
-    new HashMap<String, HServerInfo>();
-  
-  /** Set of known dead servers */
-  final Set<String> deadServers =
-    Collections.synchronizedSet(new HashSet<String>());
-
-  /** SortedMap server load -> Set of server names */
-  SortedMap<HServerLoad, Set<String>> loadToServers;
-
-  /** Map of server names -> server load */
-  Map<String, HServerLoad> serversToLoad;
+  //
+  /////////////////////////////////////////////////////////////////////////////
 
   /** Build the HMaster out of a raw configuration item.
    * 
@@ -882,7 +885,8 @@
    * @throws IOException
    */
   public HMaster(Path dir, HServerAddress address, HBaseConfiguration conf)
-  throws IOException {
+    throws IOException {
+    
     this.fsOk = true;
     this.dir = dir;
     this.conf = conf;
@@ -929,9 +933,6 @@
     this.maxRegionOpenTime =
       conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
 
-    this.shutdownQueue = new DelayQueue<PendingServerShutdown>();
-    this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
-
     this.leaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000);
     this.serverLeases = new Leases(this.leaseTimeout, 
         conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
@@ -955,27 +956,10 @@
 
     // Scans the meta table
     this.initialMetaScanComplete = false;
-
     this.metaScannerThread = new MetaScanner();
     
     unassignRootRegion();
 
-    this.pendingRegions =
-      Collections.synchronizedSet(new HashSet<Text>());
-
-    this.killList = 
-      Collections.synchronizedMap(
-          new HashMap<String, HashMap<Text, HRegionInfo>>());
-
-    this.killedRegions =
-      Collections.synchronizedSet(new HashSet<Text>());
-
-    this.regionsToDelete =
-      Collections.synchronizedSet(new HashSet<Text>());
-
-    this.loadToServers = new TreeMap<HServerLoad, Set<String>>();
-    this.serversToLoad = new HashMap<String, HServerLoad>();
-
     this.sleeper = new Sleeper(this.threadWakeFrequency, this.closed);
     
     // We're almost open for business
@@ -989,6 +973,9 @@
    * without reporting in.  Currently, we just flounder and never recover.  We
    * could 'notice' dead region server in root scanner -- if we failed access
    * multiple times -- but reassigning root is catastrophic.
+   * 
+   * Note: This method must be called from inside a synchronized block on
+   * serversToServerInfo
    */
   void unassignRootRegion() {
     this.rootRegionLocation.set(null);
@@ -996,7 +983,6 @@
         HRegionInfo.rootRegionInfo);
     this.assignAttempts.put(HRegionInfo.rootRegionInfo.getRegionName(),
         Long.valueOf(0L));
-    // TODO: If the old root region server had a log, it needs splitting.
   }
 
   /**
@@ -1065,8 +1051,12 @@
      * Main processing loop
      */
     try {
-      for (PendingOperation op = null; !closed.get(); ) {
-        op = this.shutdownQueue.poll();
+      for (RegionServerOperation op = null; !closed.get(); ) {
+        if (rootRegionLocation.get() != null) {
+          // We can't process server shutdowns unless the root region is online 
+
+          op = this.shutdownQueue.poll();
+        }
         if (op == null ) {
           try {
             op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
@@ -1238,33 +1228,37 @@
   /** {@inheritDoc} */
   @SuppressWarnings("unused")
   public MapWritable regionServerStartup(HServerInfo serverInfo)
-  throws IOException {
+    throws IOException {
+
     String s = serverInfo.getServerAddress().toString().trim();
-    HServerInfo storedInfo = null;
     LOG.info("received start message from: " + s);
 
-    // If we get the startup message but there's an old server by that
-    // name, then we can timeout the old one right away and register
-    // the new one.
     synchronized (serversToServerInfo) {
-      storedInfo = serversToServerInfo.remove(s);
       HServerLoad load = serversToLoad.remove(s);
       if (load != null) {
+        // The startup message was from a known server.
+        // Remove stale information about the server's load.
         Set<String> servers = loadToServers.get(load);
         if (servers != null) {
           servers.remove(s);
           loadToServers.put(load, servers);
         }
       }
-      serversToServerInfo.notifyAll();
-    }
-    if (storedInfo != null && !closed.get()) {
-      shutdownQueue.put(new PendingServerShutdown(storedInfo));
-    }
+      
+      HServerInfo storedInfo = serversToServerInfo.remove(s);
+      if (storedInfo != null && !closed.get()) {
+        // The startup message was from a know server with the same name.
+        // Timeout the old one right away.
+        HServerAddress root = rootRegionLocation.get();
+        if (root != null && root.equals(storedInfo.getServerAddress())) {
+          unassignRootRegion();
+        }
+        shutdownQueue.put(new ProcessServerShutdown(storedInfo));
+      }
 
-    // Either way, record the new server
-    synchronized (serversToServerInfo) {
-      HServerLoad load = new HServerLoad();
+      // record new server
+      
+      load = new HServerLoad();
       serverInfo.setLoad(load);
       serversToServerInfo.put(s, serverInfo);
       serversToLoad.put(s, load);
@@ -1274,6 +1268,7 @@
       }
       servers.add(s);
       loadToServers.put(load, servers);
+      serversToServerInfo.notifyAll();
     }
 
     if (!closed.get()) {
@@ -1332,8 +1327,10 @@
               onlineMetaRegions.remove(info.getStartKey());
             }
 
-            this.unassignedRegions.put(info.getRegionName(), info);
-            this.assignAttempts.put(info.getRegionName(), Long.valueOf(0L));
+            synchronized (serversToServerInfo) {
+              this.unassignedRegions.put(info.getRegionName(), info);
+              this.assignAttempts.put(info.getRegionName(), Long.valueOf(0L));
+            }
           }
         }
       }
@@ -1431,6 +1428,10 @@
     boolean leaseCancelled = false;
     synchronized (serversToServerInfo) {
       HServerInfo info = serversToServerInfo.remove(serverName);
+      if (rootRegionLocation.get() != null &&
+          info.getServerAddress().equals(rootRegionLocation.get())) {
+        unassignRootRegion();
+      }
       if (info != null) {
         // Only cancel lease and update load information once.
         // This method can be called a couple of times during shutdown.
@@ -1464,143 +1465,148 @@
     
     ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
     String serverName = info.getServerAddress().toString();
-    HashMap<Text, HRegionInfo> regionsToKill = killList.remove(serverName);
+    HashMap<Text, HRegionInfo> regionsToKill = null;
+    synchronized (serversToServerInfo) {
+      regionsToKill = killList.remove(serverName);
+    }
 
     // Get reports on what the RegionServer did.
 
     for (int i = 0; i < incomingMsgs.length; i++) {
       HRegionInfo region = incomingMsgs[i].getRegionInfo();
 
-      switch (incomingMsgs[i].getMsg()) {
+      synchronized (serversToServerInfo) {
+        switch (incomingMsgs[i].getMsg()) {
 
-      case HMsg.MSG_REPORT_OPEN:
-        HRegionInfo regionInfo = unassignedRegions.get(region.getRegionName());
+        case HMsg.MSG_REPORT_OPEN:
+          HRegionInfo regionInfo = unassignedRegions.get(region.getRegionName());
 
-        if (regionInfo == null) {
+          if (regionInfo == null) {
 
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("region server " + info.getServerAddress().toString()
-                + " should not have opened region " + region.getRegionName());
-          }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("region server " + info.getServerAddress().toString()
+                  + " should not have opened region " + region.getRegionName());
+            }
 
-          // This Region should not have been opened.
-          // Ask the server to shut it down, but don't report it as closed.  
-          // Otherwise the HMaster will think the Region was closed on purpose, 
-          // and then try to reopen it elsewhere; that's not what we want.
+            // This Region should not have been opened.
+            // Ask the server to shut it down, but don't report it as closed.  
+            // Otherwise the HMaster will think the Region was closed on purpose, 
+            // and then try to reopen it elsewhere; that's not what we want.
 
-          returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); 
+            returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); 
 
-        } else {
-          LOG.info(info.getServerAddress().toString() + " serving " +
-              region.getRegionName());
-          // Remove from unassigned list so we don't assign it to someone else
-          this.unassignedRegions.remove(region.getRegionName());
-          this.assignAttempts.remove(region.getRegionName());
-          if (region.getRegionName().compareTo(
-              HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
-            // Store the Root Region location (in memory)
-            synchronized (rootRegionLocation) {
-              this.rootRegionLocation.
-                set(new HServerAddress(info.getServerAddress()));
-              this.rootRegionLocation.notifyAll();
+          } else {
+            LOG.info(info.getServerAddress().toString() + " serving " +
+                region.getRegionName());
+            // Remove from unassigned list so we don't assign it to someone else
+            this.unassignedRegions.remove(region.getRegionName());
+            this.assignAttempts.remove(region.getRegionName());
+            if (region.getRegionName().compareTo(
+                HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
+              // Store the Root Region location (in memory)
+              synchronized (rootRegionLocation) {
+                this.rootRegionLocation.set(
+                    new HServerAddress(info.getServerAddress()));
+                this.rootRegionLocation.notifyAll();
+              }
+              break;
             }
-            break;
-          }
 
-          // Note that the table has been assigned and is waiting for the meta
-          // table to be updated.
+            // Note that the table has been assigned and is waiting for the meta
+            // table to be updated.
 
-          pendingRegions.add(region.getRegionName());
+            pendingRegions.add(region.getRegionName());
 
-          // Queue up an update to note the region location.
+            // Queue up an update to note the region location.
 
-          try {
-            msgQueue.put(new PendingOpenReport(info, region));
-          } catch (InterruptedException e) {
-            throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+            try {
+              msgQueue.put(new ProcessRegionOpen(info, region));
+            } catch (InterruptedException e) {
+              throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+            }
           }
-        }
-        break;
+          break;
 
-      case HMsg.MSG_REPORT_CLOSE:
-        LOG.info(info.getServerAddress().toString() + " no longer serving " +
-            region.getRegionName());
+        case HMsg.MSG_REPORT_CLOSE:
+          LOG.info(info.getServerAddress().toString() + " no longer serving " +
+              region.getRegionName());
 
-        if (region.getRegionName().compareTo(
-            HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
-          
-          // Root region
-          
-          rootRegionLocation.set(null);
-          unassignedRegions.put(region.getRegionName(), region);
-          assignAttempts.put(region.getRegionName(), Long.valueOf(0L));
+          if (region.getRegionName().compareTo(
+              HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
 
-        } else {
-          boolean reassignRegion = true;
-          boolean deleteRegion = false;
+            // Root region
 
-          if (killedRegions.remove(region.getRegionName())) {
-            reassignRegion = false;
-          }
+            unassignRootRegion();
 
-          if (regionsToDelete.remove(region.getRegionName())) {
-            reassignRegion = false;
-            deleteRegion = true;
-          }
+          } else {
+            boolean reassignRegion = true;
+            boolean deleteRegion = false;
 
-          // NOTE: we cannot put the region into unassignedRegions as that
-          //       could create a race with the pending close if it gets 
-          //       reassigned before the close is processed.
+            if (killedRegions.remove(region.getRegionName())) {
+              reassignRegion = false;
+            }
 
-          unassignedRegions.remove(region.getRegionName());
-          assignAttempts.remove(region.getRegionName());
+            if (regionsToDelete.remove(region.getRegionName())) {
+              reassignRegion = false;
+              deleteRegion = true;
+            }
 
-          try {
-            msgQueue.put(new PendingCloseReport(region, reassignRegion,
-                deleteRegion));
-            
-          } catch (InterruptedException e) {
-            throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+            // NOTE: we cannot put the region into unassignedRegions as that
+            //       could create a race with the pending close if it gets 
+            //       reassigned before the close is processed.
+
+            unassignedRegions.remove(region.getRegionName());
+            assignAttempts.remove(region.getRegionName());
+
+            try {
+              msgQueue.put(new ProcessRegionClose(region, reassignRegion,
+                  deleteRegion));
+
+            } catch (InterruptedException e) {
+              throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+            }
           }
-        }
-        break;
+          break;
 
-      case HMsg.MSG_REPORT_SPLIT:
-        // A region has split.
-        
-        HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo();
-        unassignedRegions.put(newRegionA.getRegionName(), newRegionA);
-        assignAttempts.put(newRegionA.getRegionName(), Long.valueOf(0L));
-
-        HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo();
-        unassignedRegions.put(newRegionB.getRegionName(), newRegionB);
-        assignAttempts.put(newRegionB.getRegionName(), Long.valueOf(0L));
-
-        LOG.info("region " + region.getRegionName() +
-            " split. New regions are: " + newRegionA.getRegionName() + ", " +
-            newRegionB.getRegionName());
+        case HMsg.MSG_REPORT_SPLIT:
+          // A region has split.
 
-        if (region.getTableDesc().getName().equals(META_TABLE_NAME)) {
-          // A meta region has split.
+          HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo();
+          unassignedRegions.put(newRegionA.getRegionName(), newRegionA);
+          assignAttempts.put(newRegionA.getRegionName(), Long.valueOf(0L));
+
+          HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo();
+          unassignedRegions.put(newRegionB.getRegionName(), newRegionB);
+          assignAttempts.put(newRegionB.getRegionName(), Long.valueOf(0L));
+
+          LOG.info("region " + region.getRegionName() +
+              " split. New regions are: " + newRegionA.getRegionName() + ", " +
+              newRegionB.getRegionName());
 
-          onlineMetaRegions.remove(region.getStartKey());
-          numberOfMetaRegions.incrementAndGet();
-        }
-        break;
+          if (region.getTableDesc().getName().equals(META_TABLE_NAME)) {
+            // A meta region has split.
 
-      default:
-        throw new IOException(
-            "Impossible state during msg processing.  Instruction: " +
-            incomingMsgs[i].getMsg());
+            onlineMetaRegions.remove(region.getStartKey());
+            numberOfMetaRegions.incrementAndGet();
+          }
+          break;
+
+        default:
+          throw new IOException(
+              "Impossible state during msg processing.  Instruction: " +
+              incomingMsgs[i].getMsg());
+        }
       }
     }
 
     // Process the kill list
 
-    if (regionsToKill != null) {
-      for (HRegionInfo i: regionsToKill.values()) {
-        returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i));
-        killedRegions.add(i.getRegionName());
+    synchronized (serversToServerInfo) {
+      if (regionsToKill != null) {
+        for (HRegionInfo i: regionsToKill.values()) {
+          returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i));
+          killedRegions.add(i.getRegionName());
+        }
       }
     }
 
@@ -1617,26 +1623,33 @@
    * @param serverName
    * @param returnMsgs
    */
-  private synchronized void assignRegions(HServerInfo info, String serverName,
+  private void assignRegions(HServerInfo info, String serverName,
       ArrayList<HMsg> returnMsgs) {
     
-    TreeSet<Text> regionsToAssign = getRegionsToAssign();
-    int nRegionsToAssign = regionsToAssign.size();
-    if (nRegionsToAssign <= 0) {
-      // No regions to assign.  Return.
-      return;
-    }
-    
-    if (this.serversToServerInfo.size() == 1) {
-      assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
-      // Finished.  Return.
-      return;
-    }
+    long now = System.currentTimeMillis();
+    SortedSet<Text> regionsToAssign = new TreeSet<Text>();
+    synchronized (serversToServerInfo) {
+      for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
+        long diff = now - e.getValue().longValue();
+        if (diff > this.maxRegionOpenTime) {
+          regionsToAssign.add(e.getKey());
+        }
+      }
+      int nRegionsToAssign = regionsToAssign.size();
+      if (nRegionsToAssign <= 0) {
+        // No regions to assign.  Return.
+        return;
+      }
 
-    // Multiple servers in play.
-    // We need to allocate regions only to most lightly loaded servers.
-    HServerLoad thisServersLoad = info.getLoad();
-    synchronized (this.serversToServerInfo) {
+      if (this.serversToServerInfo.size() == 1) {
+        assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
+        // Finished.  Return.
+        return;
+      }
+
+      // Multiple servers in play.
+      // We need to allocate regions only to most lightly loaded servers.
+      HServerLoad thisServersLoad = info.getLoad();
       int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
       nRegionsToAssign -= nregions;
       if (nRegionsToAssign > 0) {
@@ -1667,11 +1680,11 @@
           // There is a more heavily loaded server
           for (HServerLoad load =
             new HServerLoad(thisServersLoad.getNumberOfRequests(),
-              thisServersLoad.getNumberOfRegions());
-            load.compareTo(heavierLoad) <= 0 &&
-              nregions < nRegionsToAssign;
-            load.setNumberOfRegions(load.getNumberOfRegions() + 1),
-              nregions++) {
+                thisServersLoad.getNumberOfRegions());
+          load.compareTo(heavierLoad) <= 0 &&
+          nregions < nRegionsToAssign;
+          load.setNumberOfRegions(load.getNumberOfRegions() + 1),
+          nregions++) {
             // continue;
           }
         }
@@ -1695,11 +1708,11 @@
           nregions = nRegionsToAssign;
         }
 
-        long now = System.currentTimeMillis();
+        now = System.currentTimeMillis();
         for (Text regionName: regionsToAssign) {
           HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
           LOG.info("assigning region " + regionName + " to server " +
-            serverName);
+              serverName);
           this.assignAttempts.put(regionName, Long.valueOf(now));
           returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
           if (--nregions <= 0) {
@@ -1714,6 +1727,9 @@
    * @param nRegionsToAssign
    * @param thisServersLoad
    * @return How many regions we can assign to more lightly loaded servers
+   * 
+   * Note: this method MUST be called from inside a synchronized block on
+   * serversToServerInfo
    */
   private int regionsPerServer(final int nRegionsToAssign,
       final HServerLoad thisServersLoad) {
@@ -1744,39 +1760,26 @@
    * @param serverName
    * @param returnMsgs
    */
-  private void assignRegionsToOneServer(final TreeSet<Text> regionsToAssign,
+  private void assignRegionsToOneServer(final SortedSet<Text> regionsToAssign,
       final String serverName, final ArrayList<HMsg> returnMsgs) {
     long now = System.currentTimeMillis();
     for (Text regionName: regionsToAssign) {
-      HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
-      LOG.info("assigning region " + regionName + " to the only server " +
-        serverName);
-      this.assignAttempts.put(regionName, Long.valueOf(now));
-      returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
-    }
-  }
-  
-  /*
-   * @return List of regions to assign.
-   */
-  private TreeSet<Text> getRegionsToAssign() {
-    long now = System.currentTimeMillis();
-    TreeSet<Text> regionsToAssign = new TreeSet<Text>();
-    for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
-      long diff = now - e.getValue().longValue();
-      if (diff > this.maxRegionOpenTime) {
-        regionsToAssign.add(e.getKey());
+      synchronized (serversToServerInfo) {
+        HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
+        LOG.info("assigning region " + regionName + " to the only server " +
+            serverName);
+        this.assignAttempts.put(regionName, Long.valueOf(now));
+        returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
       }
     }
-    return regionsToAssign;
   }
-
+  
   /*
-   * Some internal classes to manage msg-passing and client operations
+   * Some internal classes to manage msg-passing and region server operations
    */
 
-  private abstract class PendingOperation {
-    PendingOperation() {
+  private abstract class RegionServerOperation {
+    RegionServerOperation() {
       super();
     }
 
@@ -1788,9 +1791,9 @@
    * The region server's log file needs to be split up for each region it was
    * serving, and the regions need to get reassigned.
    */
-  private class PendingServerShutdown extends PendingOperation
+  private class ProcessServerShutdown extends RegionServerOperation
   implements Delayed {
-    private final long expire;
+    private long expire;
     private HServerAddress deadServer;
     private String deadServerName;
     private Path oldLogDir;
@@ -1812,7 +1815,7 @@
       }
     }
 
-    PendingServerShutdown(HServerInfo serverInfo) {
+    ProcessServerShutdown(HServerInfo serverInfo) {
       super();
       this.deadServer = serverInfo.getServerAddress();
       this.deadServerName = this.deadServer.toString();
@@ -1846,7 +1849,7 @@
     /** {@inheritDoc} */
     @Override
     public String toString() {
-      return "PendingServerShutdown of " + this.deadServer.toString();
+      return "ProcessServerShutdown of " + this.deadServer.toString();
     }
 
     /** Finds regions that the dead region server was serving */
@@ -1936,32 +1939,34 @@
           ToDoEntry todo = new ToDoEntry(row, info);
           toDoList.add(todo);
 
-          if (killList.containsKey(deadServerName)) {
-            HashMap<Text, HRegionInfo> regionsToKill =
-              killList.get(deadServerName);
-
-            if (regionsToKill.containsKey(info.getRegionName())) {
-              regionsToKill.remove(info.getRegionName());
-              killList.put(deadServerName, regionsToKill);
-              unassignedRegions.remove(info.getRegionName());
-              assignAttempts.remove(info.getRegionName());
-              if (regionsToDelete.contains(info.getRegionName())) {
-                // Delete this region
-                regionsToDelete.remove(info.getRegionName());
-                todo.deleteRegion = true;
-              } else {
-                // Mark region offline
-                todo.regionOffline = true;
+          synchronized (serversToServerInfo) {
+            if (killList.containsKey(deadServerName)) {
+              HashMap<Text, HRegionInfo> regionsToKill =
+                killList.get(deadServerName);
+
+              if (regionsToKill.containsKey(info.getRegionName())) {
+                regionsToKill.remove(info.getRegionName());
+                killList.put(deadServerName, regionsToKill);
+                unassignedRegions.remove(info.getRegionName());
+                assignAttempts.remove(info.getRegionName());
+                if (regionsToDelete.contains(info.getRegionName())) {
+                  // Delete this region
+                  regionsToDelete.remove(info.getRegionName());
+                  todo.deleteRegion = true;
+                } else {
+                  // Mark region offline
+                  todo.regionOffline = true;
+                }
               }
+
+            } else {
+              // Get region reassigned
+              regions.put(info.getRegionName(), info);
+
+              // If it was pending, remove.
+              // Otherwise will obstruct its getting reassigned.
+              pendingRegions.remove(info.getRegionName());
             }
-            
-          } else {
-            // Get region reassigned
-            regions.put(info.getRegionName(), info);
-           
-            // If it was pending, remove.
-            // Otherwise will obstruct its getting reassigned.
-            pendingRegions.remove(info.getRegionName());
           }
         }
       } finally {
@@ -1994,8 +1999,10 @@
       for (Map.Entry<Text, HRegionInfo> e: regions.entrySet()) {
         Text region = e.getKey();
         HRegionInfo regionInfo = e.getValue();
-        unassignedRegions.put(region, regionInfo);
-        assignAttempts.put(region, Long.valueOf(0L));
+        synchronized (serversToServerInfo) {
+          unassignedRegions.put(region, regionInfo);
+          assignAttempts.put(region, Long.valueOf(0L));
+        }
       }
     }
 
@@ -2023,15 +2030,30 @@
       }
 
       if (!rootChecked) {
-        if (rootRegionLocation.get() != null &&
-            deadServer.equals(rootRegionLocation.get())) {
-
-          rootRegionLocation.set(null);
-          unassignedRegions.put(HRegionInfo.rootRegionInfo.getRegionName(),
-              HRegionInfo.rootRegionInfo);
-
-          assignAttempts.put(HRegionInfo.rootRegionInfo.getRegionName(),
-              Long.valueOf(0L));
+        boolean rootRegionUnavailable = false;
+        if (rootRegionLocation.get() == null) {
+          rootRegionUnavailable = true;
+
+        } else if (deadServer.equals(rootRegionLocation.get())) {
+          // We should never get here because whenever an object of this type
+          // is created, a check is made to see if it is the root server.
+          // and unassignRootRegion() is called then. However, in the
+          // unlikely event that we do end up here, let's do the right thing.
+          synchronized (serversToServerInfo) {
+            unassignRootRegion();
+          }
+          rootRegionUnavailable = true;
+        }
+        if (rootRegionUnavailable) {
+          // We can't do anything until the root region is on-line, put
+          // us back on the delay queue. Reset the future time at which
+          // we expect to be released from the DelayQueue we're inserted
+          // in on lease expiration.
+          this.expire = System.currentTimeMillis() + leaseTimeout / 2;
+          shutdownQueue.put(this);
+          
+          // Return true so run() does not put us back on the msgQueue
+          return true;
         }
         rootChecked = true;
       }
@@ -2106,31 +2128,35 @@
           if (closed.get()) {
             return true;
           }
-          for (MetaRegion r: onlineMetaRegions.values()) {
+          synchronized (onlineMetaRegions) {
+            for (MetaRegion r: onlineMetaRegions.values()) {
 
-            HRegionInterface server = null;
-            long scannerId = -1L;
+              HRegionInterface server = null;
+              long scannerId = -1L;
 
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("process server shutdown scanning " +
-                  r.getRegionName() + " on " + r.getServer() + " " +
-                  Thread.currentThread().getName());
-            }
-            server = connection.getHRegionConnection(r.getServer());
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("process server shutdown scanning " +
+                    r.getRegionName() + " on " + r.getServer() + " " +
+                    Thread.currentThread().getName());
+              }
+              server = connection.getHRegionConnection(r.getServer());
 
-            scannerId =
-              server.openScanner(r.getRegionName(), COLUMN_FAMILY_ARRAY,
-                  EMPTY_START_ROW, System.currentTimeMillis(), null);
-            
-            scanMetaRegion(server, scannerId, r.getRegionName());
-            
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("process server shutdown finished scanning " +
-                  r.getRegionName() + " on " + r.getServer() + " " +
-                  Thread.currentThread().getName());
+              scannerId =
+                server.openScanner(r.getRegionName(), COLUMN_FAMILY_ARRAY,
+                    EMPTY_START_ROW, System.currentTimeMillis(), null);
+
+              scanMetaRegion(server, scannerId, r.getRegionName());
+
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("process server shutdown finished scanning " +
+                    r.getRegionName() + " on " + r.getServer() + " " +
+                    Thread.currentThread().getName());
+              }
             }
           }
-          deadServers.remove(deadServerName);
+          synchronized (serversToServerInfo) {
+            deadServers.remove(deadServerName);
+          }
           break;
 
         } catch (IOException e) {
@@ -2144,16 +2170,16 @@
   }
 
   /**
-   * PendingCloseReport is instantiated when a region server reports that it
+   * ProcessRegionClose is instantiated when a region server reports that it
    * has closed a region.
    */
-  private class PendingCloseReport extends PendingOperation {
+  private class ProcessRegionClose extends RegionServerOperation {
     private HRegionInfo regionInfo;
     private boolean reassignRegion;
     private boolean deleteRegion;
     private boolean rootRegion;
 
-    PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion,
+    ProcessRegionClose(HRegionInfo regionInfo, boolean reassignRegion,
         boolean deleteRegion) {
 
       super();
@@ -2176,7 +2202,7 @@
     /** {@inheritDoc} */
     @Override
     public String toString() {
-      return "PendingCloseReport of " + this.regionInfo.getRegionName();
+      return "ProcessRegionClose of " + this.regionInfo.getRegionName();
     }
 
     @Override
@@ -2220,12 +2246,14 @@
           }
 
           MetaRegion r = null;
-          if (onlineMetaRegions.containsKey(regionInfo.getRegionName())) {
-            r = onlineMetaRegions.get(regionInfo.getRegionName());
+          synchronized (onlineMetaRegions) {
+            if (onlineMetaRegions.containsKey(regionInfo.getRegionName())) {
+              r = onlineMetaRegions.get(regionInfo.getRegionName());
 
-          } else {
-            r = onlineMetaRegions.get(onlineMetaRegions.headMap(
-                regionInfo.getRegionName()).lastKey());
+            } else {
+              r = onlineMetaRegions.get(onlineMetaRegions.headMap(
+                  regionInfo.getRegionName()).lastKey());
+            }
           }
           metaRegionName = r.getRegionName();
           server = connection.getHRegionConnection(r.getServer());
@@ -2259,8 +2287,10 @@
       if (reassignRegion) {
         LOG.info("reassign region: " + regionInfo.getRegionName());
 
-        unassignedRegions.put(regionInfo.getRegionName(), regionInfo);
-        assignAttempts.put(regionInfo.getRegionName(), Long.valueOf(0L));
+        synchronized (serversToServerInfo) {
+          unassignedRegions.put(regionInfo.getRegionName(), regionInfo);
+          assignAttempts.put(regionInfo.getRegionName(), Long.valueOf(0L));
+        }
 
       } else if (deleteRegion) {
         try {
@@ -2277,17 +2307,17 @@
   }
 
   /** 
-   * PendingOpenReport is instantiated when a region server reports that it is
+   * ProcessRegionOpen is instantiated when a region server reports that it is
    * serving a region. This applies to all meta and user regions except the 
    * root region which is handled specially.
    */
-  private class PendingOpenReport extends PendingOperation {
+  private class ProcessRegionOpen extends RegionServerOperation {
     private final boolean rootRegion;
     private final HRegionInfo region;
     private final HServerAddress serverAddress;
     private final byte [] startCode;
 
-    PendingOpenReport(HServerInfo info, HRegionInfo region)
+    ProcessRegionOpen(HServerInfo info, HRegionInfo region)
     throws IOException {
       // If true, the region which just came on-line is a META region.
       // We need to look in the ROOT region for its information.  Otherwise,
@@ -2345,10 +2375,10 @@
             return false;
           }
 
-          MetaRegion r = onlineMetaRegions.containsKey(region.getRegionName())?
-            onlineMetaRegions.get(region.getRegionName()):
-            onlineMetaRegions.get(onlineMetaRegions.
-              headMap(region.getRegionName()).lastKey());
+          MetaRegion r = onlineMetaRegions.containsKey(region.getRegionName()) ?
+            onlineMetaRegions.get(region.getRegionName()) :
+            onlineMetaRegions.get(onlineMetaRegions.headMap(
+                region.getRegionName()).lastKey());
           metaRegionName = r.getRegionName();
           server = connection.getHRegionConnection(r.getServer());
         }
@@ -2384,7 +2414,9 @@
             }
           }
           // If updated successfully, remove from pending list.
-          pendingRegions.remove(region.getRegionName());
+          synchronized (serversToServerInfo) {
+            pendingRegions.remove(region.getRegionName());
+          }
           break;
         } catch (IOException e) {
           if (tries == numRetries - 1) {
@@ -2413,6 +2445,7 @@
         closed.set(true);
         synchronized(msgQueue) {
           msgQueue.clear();                         // Empty the queue
+          shutdownQueue.clear();                    // Empty shut down queue
           msgQueue.notifyAll();                     // Wake main thread
         }
       }
@@ -2467,12 +2500,15 @@
       // for the table we want to create already exists, then table already
       // created. Throw already-exists exception.
       
-      MetaRegion m = (onlineMetaRegions.size() == 1 ?
-          onlineMetaRegions.get(onlineMetaRegions.firstKey()) : 
-            (onlineMetaRegions.containsKey(newRegion.getRegionName()) ?
-                onlineMetaRegions.get(newRegion.getRegionName()) :
-                  onlineMetaRegions.get(onlineMetaRegions.headMap(
-                      newRegion.getTableDesc().getName()).lastKey())));
+      MetaRegion m = null;
+      synchronized (onlineMetaRegions) {
+        m = (onlineMetaRegions.size() == 1 ?
+            onlineMetaRegions.get(onlineMetaRegions.firstKey()) : 
+              (onlineMetaRegions.containsKey(newRegion.getRegionName()) ?
+                  onlineMetaRegions.get(newRegion.getRegionName()) :
+                    onlineMetaRegions.get(onlineMetaRegions.headMap(
+                        newRegion.getTableDesc().getName()).lastKey())));
+      }
           
       Text metaRegionName = m.getRegionName();
       HRegionInterface server = connection.getHRegionConnection(m.getServer());
@@ -2521,9 +2557,11 @@
       region.getLog().closeAndDelete();
 
       // 5. Get it assigned to a server
-      
-      this.unassignedRegions.put(regionName, info);
-      this.assignAttempts.put(regionName, Long.valueOf(0L));
+
+      synchronized (serversToServerInfo) {
+        this.unassignedRegions.put(regionName, info);
+        this.assignAttempts.put(regionName, Long.valueOf(0L));
+      }
 
     } finally {
       synchronized (tableInCreation) {
@@ -2601,7 +2639,10 @@
         firstMetaRegion = onlineMetaRegions.headMap(tableName).lastKey();
       }
 
-      this.metaRegions.addAll(onlineMetaRegions.tailMap(firstMetaRegion).values());
+      synchronized (onlineMetaRegions) {
+        this.metaRegions.addAll(onlineMetaRegions.tailMap(
+            firstMetaRegion).values());
+      }
     }
 
     void process() throws IOException {
@@ -2799,15 +2840,17 @@
           LOG.debug("updated columns in row: " + i.getRegionName());
         }
 
-        if (online) {                           // Bring offline regions on-line
-          if (!unassignedRegions.containsKey(i.getRegionName())) {
-            unassignedRegions.put(i.getRegionName(), i);
-            assignAttempts.put(i.getRegionName(), Long.valueOf(0L));
-          }
+        synchronized (serversToServerInfo) {
+          if (online) {                         // Bring offline regions on-line
+            if (!unassignedRegions.containsKey(i.getRegionName())) {
+              unassignedRegions.put(i.getRegionName(), i);
+              assignAttempts.put(i.getRegionName(), Long.valueOf(0L));
+            }
 
-        } else {                                // Prevent region from getting assigned.
-          unassignedRegions.remove(i.getRegionName());
-          assignAttempts.remove(i.getRegionName());
+          } else {                              // Prevent region from getting assigned.
+            unassignedRegions.remove(i.getRegionName());
+            assignAttempts.remove(i.getRegionName());
+          }
         }
       }
 
@@ -2825,7 +2868,10 @@
 
         // Cause regions being served to be taken off-line and disabled
 
-        HashMap<Text, HRegionInfo> localKillList = killList.get(serverName);
+        HashMap<Text, HRegionInfo> localKillList = null;
+        synchronized (serversToServerInfo) {
+          localKillList = killList.get(serverName);
+        }
         if (localKillList == null) {
           localKillList = new HashMap<Text, HRegionInfo>();
         }
@@ -2841,7 +2887,9 @@
             LOG.debug("inserted local kill list into kill list for server " +
                 serverName);
           }
-          killList.put(serverName, localKillList);
+          synchronized (serversToServerInfo) {
+            killList.put(serverName, localKillList);
+          }
         }
       }
       servedRegions.clear();
@@ -2874,7 +2922,9 @@
       
       for (HashSet<HRegionInfo> s: servedRegions.values()) {
         for (HRegionInfo i: s) {
-          regionsToDelete.add(i.getRegionName());
+          synchronized (serversToServerInfo) {
+            regionsToDelete.add(i.getRegionName());
+          }
         }
       }
 
@@ -3005,6 +3055,10 @@
       synchronized (serversToServerInfo) {
         info = serversToServerInfo.remove(server);
         if (info != null) {
+          HServerAddress root = rootRegionLocation.get();
+          if (root != null && root.equals(info.getServerAddress())) {
+            unassignRootRegion();
+          }
           String serverName = info.getServerAddress().toString();
           HServerLoad load = serversToLoad.remove(serverName);
           if (load != null) {
@@ -3021,9 +3075,9 @@
 
       // NOTE: If the server was serving the root region, we cannot reassign it
       // here because the new server will start serving the root region before
-      // the PendingServerShutdown operation has a chance to split the log file.
+      // the ProcessServerShutdown operation has a chance to split the log file.
       if (info != null) {
-        shutdownQueue.put(new PendingServerShutdown(info));
+        shutdownQueue.put(new ProcessServerShutdown(info));
       }
     }
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java?rev=597959&r1=597958&r2=597959&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java Sat Nov 24 23:17:38 2007
@@ -108,7 +108,8 @@
       this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
       this.basedir = new Path(dir, "merge_" + System.currentTimeMillis());
       fs.mkdirs(basedir);
-      this.hlog = new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf);
+      this.hlog =
+        new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf, null);
     }
     
     void process() throws IOException {
@@ -150,11 +151,11 @@
       for(int i = 0; i < regions.length - 1; i++) {
         if(currentRegion == null) {
           currentRegion =
-            new HRegion(dir, hlog, fs, conf, regions[i], null);
+            new HRegion(dir, hlog, fs, conf, regions[i], null, null);
           currentSize = currentRegion.largestHStore(midKey).getAggregate();
         }
         nextRegion =
-          new HRegion(dir, hlog, fs, conf, regions[i + 1], null);
+          new HRegion(dir, hlog, fs, conf, regions[i + 1], null, null);
 
         nextSize = nextRegion.largestHStore(midKey).getAggregate();
 
@@ -327,7 +328,7 @@
       // Scan root region to find all the meta regions
       
       HRegion root =
-        new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null);
+        new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null, null);
 
       HInternalScannerInterface rootScanner =
         root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null);
@@ -363,7 +364,7 @@
         HRegion newRegion) throws IOException {
       
       HRegion root =
-        new HRegion(dir, hlog, fs, conf, HRegionInfo.rootRegionInfo, null);
+        new HRegion(dir, hlog, fs, conf, HRegionInfo.rootRegionInfo, null, null);
 
       Text[] regionsToDelete = {
           oldRegion1,

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=597959&r1=597958&r2=597959&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Sat Nov 24 23:17:38 2007
@@ -90,7 +90,6 @@
   static final Random rand = new Random();
   static final Log LOG = LogFactory.getLog(HRegion.class);
   final AtomicBoolean closed = new AtomicBoolean(false);
-  private volatile long noFlushCount = 0;
 
   /**
    * Merge two HRegions.  They must be available on the current
@@ -159,7 +158,7 @@
     // Done
     // Construction moves the merge files into place under region.
     HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo,
-        newRegionDir);
+        newRegionDir, null);
 
     // Get rid of merges directory
 
@@ -221,9 +220,10 @@
   volatile WriteState writestate = new WriteState();
 
   final int memcacheFlushSize;
+  private volatile long lastFlushTime;
+  final CacheFlushListener flushListener;
   final int blockingMemcacheSize;
   protected final long threadWakeFrequency;
-  protected final int optionalFlushCount;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final Integer updateLock = new Integer(0);
   private final long desiredMaxFileSize;
@@ -251,11 +251,13 @@
    * @param regionInfo - HRegionInfo that describes the region
    * @param initialFiles If there are initial files (implying that the HRegion
    * is new), then read them from the supplied path.
+   * @param listener an object that implements CacheFlushListener or null
    * 
    * @throws IOException
    */
   public HRegion(Path rootDir, HLog log, FileSystem fs, HBaseConfiguration conf, 
-      HRegionInfo regionInfo, Path initialFiles) throws IOException {
+      HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
+    throws IOException {
     
     this.rootDir = rootDir;
     this.log = log;
@@ -265,8 +267,6 @@
     this.encodedRegionName =
       HRegionInfo.encodeRegionName(this.regionInfo.getRegionName());
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
-    this.optionalFlushCount =
-      conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
 
     // Declare the regionName.  This is a unique string for the region, used to 
     // build a unique filename.
@@ -314,6 +314,7 @@
     // By default, we flush the cache when 16M.
     this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
         1024*1024*16);
+    this.flushListener = listener;
     this.blockingMemcacheSize = this.memcacheFlushSize *
       conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
 
@@ -323,6 +324,7 @@
 
     // HRegion is ready to go!
     this.writestate.compacting = false;
+    this.lastFlushTime = System.currentTimeMillis();
     LOG.info("region " + this.regionInfo.getRegionName() + " available");
   }
   
@@ -485,6 +487,11 @@
     return this.fs;
   }
 
+  /** @return the last time the region was flushed */
+  public long getLastFlushTime() {
+    return this.lastFlushTime;
+  }
+  
   //////////////////////////////////////////////////////////////////////////////
   // HRegion maintenance.  
   //
@@ -598,8 +605,10 @@
     // Done!
     // Opening the region copies the splits files from the splits directory
     // under each region.
-    HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
-    HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
+    HRegion regionA =
+      new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null);
+    HRegion regionB =
+      new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null);
 
     // Cleanup
     boolean deleted = fs.delete(splits);    // Get rid of splits directory
@@ -751,54 +760,30 @@
   }
 
   /**
-   * Flush the cache if necessary. This is called periodically to minimize the
-   * amount of log processing needed upon startup.
+   * Flush the cache.
    * 
-   * <p>The returned Vector is a list of all the files used by the component
-   * HStores. It is a list of HStoreFile objects.  If the returned value is
-   * NULL, then the flush could not be executed, because the HRegion is busy
-   * doing something else storage-intensive.  The caller should check back
-   * later.
+   * When this method is called the cache will be flushed unless:
+   * <ol>
+   *   <li>the cache is empty</li>
+   *   <li>the region is closed.</li>
+   *   <li>a flush is already in progress</li>
+   *   <li>writes are disabled</li>
+   * </ol>
    *
    * <p>This method may block for some time, so it should not be called from a 
    * time-sensitive thread.
    * 
-   * @param disableFutureWrites indicates that the caller intends to 
-   * close() the HRegion shortly, so the HRegion should not take on any new and 
-   * potentially long-lasting disk operations. This flush() should be the final
-   * pre-close() disk operation.
+   * @return true if cache was flushed
+   * 
    * @throws IOException
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  void flushcache() throws IOException {
+  boolean flushcache() throws IOException {
     lock.readLock().lock();                      // Prevent splits and closes
     try {
       if (this.closed.get()) {
-        return;
-      }
-      boolean needFlush = false;
-      long memcacheSize = this.memcacheSize.get();
-      if(memcacheSize > this.memcacheFlushSize) {
-        needFlush = true;
-      } else if (memcacheSize > 0) {
-        if (this.noFlushCount >= this.optionalFlushCount) {
-          LOG.info("Optional flush called " + this.noFlushCount +
-          " times when data present without flushing.  Forcing one.");
-          needFlush = true;
-        } else {
-          // Only increment if something in the cache.
-          // Gets zero'd when a flushcache is called.
-          this.noFlushCount++;
-        }
-      }
-      if (!needFlush) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Cache flush not needed for region " +
-              regionInfo.getRegionName() + ". Cache size=" + memcacheSize +
-              ", cache flush threshold=" + this.memcacheFlushSize);
-        }
-        return;
+        return false;
       }
       synchronized (writestate) {
         if ((!writestate.flushing) && writestate.writesEnabled) {
@@ -811,16 +796,15 @@
                 writestate.flushing + ", writesEnabled=" +
                 writestate.writesEnabled);
           }
-          return;  
+          return false;  
         }
       }
-      this.noFlushCount = 0;
       long startTime = -1;
       synchronized (updateLock) {// Stop updates while we snapshot the memcaches
         startTime = snapshotMemcaches();
       }
       try {
-        internalFlushcache(startTime);
+        return internalFlushcache(startTime);
       } finally {
         synchronized (writestate) {
           writestate.flushing = false;
@@ -835,7 +819,7 @@
   /*
    * It is assumed that updates are blocked for the duration of this method
    */
-  long snapshotMemcaches() {
+  private long snapshotMemcaches() {
     if (this.memcacheSize.get() == 0) {
       return -1;
     }
@@ -883,17 +867,24 @@
    * routes.
    * 
    * <p> This method may block for some time.
+   * 
+   * @param startTime the time the cache was snapshotted or -1 if a flush is
+   * not needed
+   * 
+   * @return true if the cache was flushed
+   * 
    * @throws IOException
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  void internalFlushcache(long startTime) throws IOException {
+  private boolean internalFlushcache(long startTime) throws IOException {
     if (startTime == -1) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Not flushing cache: snapshotMemcaches() determined that " +
-            "there was nothing to do");
+        LOG.debug("Not flushing cache for region " +
+            regionInfo.getRegionName() +
+            ": snapshotMemcaches() determined that there was nothing to do");
       }
-      return;
+      return false;
     }
 
     // We pass the log to the HMemcache, so we can lock down both
@@ -914,7 +905,6 @@
     // Otherwise, the snapshot content while backed up in the hlog, it will not
     // be part of the current running servers state.
 
-    long logCacheFlushId = sequenceId;
     try {
       // A.  Flush memcache to all the HStores.
       // Keep running vector of all store files that includes both old and the
@@ -938,7 +928,7 @@
     //     and that all updates to the log for this regionName that have lower 
     //     log-sequence-ids can be safely ignored.
     this.log.completeCacheFlush(this.regionInfo.getRegionName(),
-        regionInfo.getTableDesc().getName(), logCacheFlushId);
+        regionInfo.getTableDesc().getName(), sequenceId);
 
     // D. Finally notify anyone waiting on memcache to clear:
     // e.g. checkResources().
@@ -948,8 +938,10 @@
     if (LOG.isDebugEnabled()) {
       LOG.debug("Finished memcache flush for region " +
           this.regionInfo.getRegionName() + " in " +
-          (System.currentTimeMillis() - startTime) + "ms");
+          (System.currentTimeMillis() - startTime) + "ms, sequenceid=" +
+          sequenceId);
     }
+    return true;
   }
   
   //////////////////////////////////////////////////////////////////////////////
@@ -1309,13 +1301,18 @@
       this.log.append(regionInfo.getRegionName(),
           regionInfo.getTableDesc().getName(), updatesByColumn);
 
+      long memcacheSize = 0;
       for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
         HStoreKey key = e.getKey();
         byte[] val = e.getValue();
-        this.memcacheSize.addAndGet(key.getSize() +
+        memcacheSize = this.memcacheSize.addAndGet(key.getSize() +
             (val == null ? 0 : val.length));
         stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
       }
+      if (this.flushListener != null && memcacheSize > this.memcacheFlushSize) {
+        // Request a cache flush
+        this.flushListener.flushRequested(this);
+      }
     }
   }
 
@@ -1582,8 +1579,8 @@
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(regionDir);
     return new HRegion(rootDir,
-      new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
-      fs, conf, info, initialFiles);
+      new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
+      fs, conf, info, initialFiles, null);
   }
   
   /**

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java?rev=597959&r1=597958&r2=597959&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java Sat Nov 24 23:17:38 2007
@@ -78,7 +78,18 @@
   private boolean split;
   private Text startKey;
   private HTableDescriptor tableDesc;
-
+  private int hashCode;
+  
+  private void setHashCode() {
+    int result = this.regionName.hashCode();
+    result ^= Long.valueOf(this.regionId).hashCode();
+    result ^= this.startKey.hashCode();
+    result ^= this.endKey.hashCode();
+    result ^= Boolean.valueOf(this.offLine).hashCode();
+    result ^= this.tableDesc.hashCode();
+    this.hashCode = result;
+  }
+  
   /** Used to construct the HRegionInfo for the root and first meta regions */
   private HRegionInfo(long regionId, HTableDescriptor tableDesc) {
     this.regionId = regionId;
@@ -89,6 +100,7 @@
         DELIMITER + regionId);
     this.split = false;
     this.startKey = new Text();
+    setHashCode();
   }
 
   /** Default constructor - creates empty object */
@@ -100,6 +112,7 @@
     this.split = false;
     this.startKey = new Text();
     this.tableDesc = new HTableDescriptor();
+    this.hashCode = 0;
   }
   
   /**
@@ -152,6 +165,7 @@
     }
     
     this.tableDesc = tableDesc;
+    setHashCode();
   }
   
   /** @return the endKey */
@@ -232,13 +246,7 @@
    */
   @Override
   public int hashCode() {
-    int result = this.regionName.hashCode();
-    result ^= Long.valueOf(this.regionId).hashCode();
-    result ^= this.startKey.hashCode();
-    result ^= this.endKey.hashCode();
-    result ^= Boolean.valueOf(this.offLine).hashCode();
-    result ^= this.tableDesc.hashCode();
-    return result;
+    return this.hashCode;
   }
 
   //
@@ -256,6 +264,7 @@
     out.writeBoolean(split);
     startKey.write(out);
     tableDesc.write(out);
+    out.writeInt(hashCode);
   }
   
   /**
@@ -269,6 +278,7 @@
     this.split = in.readBoolean();
     this.startKey.readFields(in);
     this.tableDesc.readFields(in);
+    this.hashCode = in.readInt();
   }
   
   //



Mime
View raw message