hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1176177 [6/13] - in /hbase/branches/0.89: ./ bin/ bin/replication/ docs/ src/ src/assembly/ src/docs/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/avro/ src/main/java/org/apache/hadoop/hbase/avro/generated/ ...
Date Tue, 27 Sep 2011 02:42:01 GMT
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1176177&r1=1176176&r2=1176177&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Sep 27 02:41:56 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -49,22 +50,28 @@ import org.apache.hadoop.hbase.io.Refere
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Constructor;
 import java.util.AbstractList;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -115,7 +122,7 @@ import java.util.concurrent.locks.Reentr
  * regionName is a unique identifier for this HRegion. (startKey, endKey]
  * defines the keyspace for this HRegion.
  */
-public class HRegion implements HConstants, HeapSize { // , Writable{
+public class HRegion implements HeapSize { // , Writable{
   public static final Log LOG = LogFactory.getLog(HRegion.class);
   static final String SPLITDIR = "splits";
   static final String MERGEDIR = "merges";
@@ -149,14 +156,17 @@ public class HRegion implements HConstan
 
   final AtomicLong memstoreSize = new AtomicLong(0);
 
-  // This is the table subdirectory.
-  final Path basedir;
+  /**
+   * The directory for the table this region is part of.
+   * This directory contains the directory for this region.
+   */
+  final Path tableDir;
+
   final HLog log;
   final FileSystem fs;
   final Configuration conf;
   final HRegionInfo regionInfo;
   final Path regiondir;
-  private final Path regionCompactionDir;
   KeyValue.KVComparator comparator;
 
   /*
@@ -217,7 +227,6 @@ public class HRegion implements HConstan
   private final ReentrantReadWriteLock updatesLock =
     new ReentrantReadWriteLock();
   private final Object splitLock = new Object();
-  private long minSequenceId;
   private boolean splitRequest;
 
   private final ReadWriteConsistencyControl rwcc =
@@ -232,14 +241,13 @@ public class HRegion implements HConstan
    * Should only be used for testing purposes
    */
   public HRegion(){
-    this.basedir = null;
+    this.tableDir = null;
     this.blockingMemStoreSize = 0L;
     this.conf = null;
     this.flushListener = null;
     this.fs = null;
     this.memstoreFlushSize = 0L;
     this.log = null;
-    this.regionCompactionDir = null;
     this.regiondir = null;
     this.regionInfo = null;
     this.threadWakeFrequency = 0L;
@@ -251,7 +259,7 @@ public class HRegion implements HConstan
    * {@link HRegion#newHRegion(Path, HLog, FileSystem, Configuration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)} method.
    *
    *
-   * @param basedir qualified path of directory where region should be located,
+   * @param tableDir qualified path of directory where region should be located,
    * usually the table directory.
    * @param log The HLog is the outbound log for any updates to the HRegion
    * (There's a single HLog for all the HRegions on a single HRegionServer.)
@@ -271,24 +279,23 @@ public class HRegion implements HConstan
    * @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)
 
    */
-  public HRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
+  public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
       HRegionInfo regionInfo, FlushRequester flushListener) {
-    this.basedir = basedir;
+    this.tableDir = tableDir;
     this.comparator = regionInfo.getComparator();
     this.log = log;
     this.fs = fs;
     this.conf = conf;
     this.regionInfo = regionInfo;
     this.flushListener = flushListener;
-    this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
+        10 * 1000);
     String encodedNameStr = this.regionInfo.getEncodedName();
-    this.regiondir = new Path(basedir, encodedNameStr);
+    this.regiondir = new Path(tableDir, encodedNameStr);
     if (LOG.isDebugEnabled()) {
       // Write out region name as string and its encoded name.
       LOG.debug("Creating region " + this);
     }
-    this.regionCompactionDir =
-      new Path(getCompactionDir(basedir), encodedNameStr);
     long flushSize = regionInfo.getTableDesc().getMemStoreFlushSize();
     if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
       flushSize = conf.getLong("hbase.hregion.memstore.flush.size",
@@ -300,51 +307,45 @@ public class HRegion implements HConstan
   }
 
   /**
-   * Initialize this region and get it ready to roll.
-   * Called after construction.
+   * Initialize this region.
+   * @return What the next sequence (edit) id should be.
+   * @throws IOException e
+   */
+  public long initialize() throws IOException {
+    return initialize(null);
+  }
+
+  /**
+   * Initialize this region.
    *
-   * @param initialFiles path
-   * @param reporter progressable
+   * @param reporter Tickle every so often if initialize is taking a while.
+   * @return What the next sequence (edit) id should be.
    * @throws IOException e
    */
-  public void initialize(Path initialFiles, final Progressable reporter)
+  public long initialize(final Progressable reporter)
   throws IOException {
-    Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
-
-    moveInitialFilesIntoPlace(this.fs, initialFiles, this.regiondir);
-
     // Write HRI to a file in case we need to recover .META.
     checkRegioninfoOnFilesystem();
 
-    // Load in all the HStores.
-    long maxSeqId = -1;
-    long minSeqIdToRecover = Integer.MAX_VALUE;
+    // Remove temporary data left over from old regions
+    cleanupTmpDir();
 
+    // Load in all the HStores.  Get min and max seqids across all families.
+    long maxSeqId = -1;
+    long minSeqId = Integer.MAX_VALUE;
     for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
-      Store store = instantiateHStore(this.basedir, c, oldLogFile, reporter);
+      Store store = instantiateHStore(this.tableDir, c);
       this.stores.put(c.getName(), store);
       long storeSeqId = store.getMaxSequenceId();
       if (storeSeqId > maxSeqId) {
         maxSeqId = storeSeqId;
       }
-
-      long storeSeqIdBeforeRecovery = store.getMaxSeqIdBeforeLogRecovery();
-      if (storeSeqIdBeforeRecovery < minSeqIdToRecover) {
-        minSeqIdToRecover = storeSeqIdBeforeRecovery;
-      }
-    }
-
-    // Play log if one.  Delete when done.
-    doReconstructionLog(oldLogFile, minSeqIdToRecover, maxSeqId, reporter);
-    if (fs.exists(oldLogFile)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Deleting old log file: " + oldLogFile);
+      if (minSeqId > storeSeqId) {
+        minSeqId = storeSeqId;
       }
-      fs.delete(oldLogFile, false);
     }
-
-    // Add one to the current maximum sequence id so new edits are beyond.
-    this.minSequenceId = maxSeqId + 1;
+    // Recover any edits if available.
+    long seqid = replayRecoveredEditsIfAny(this.regiondir, minSeqId, reporter);
 
     // Get rid of any splits or merges that were lost in-progress.  Clean out
     // these directories here on open.  We may be opening a region that was
@@ -357,11 +358,13 @@ public class HRegion implements HConstan
       this.writestate.setReadOnly(true);
     }
 
-    // HRegion is ready to go!
     this.writestate.compacting = false;
     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
-    LOG.info("region " + this +
-             " available; sequence id is " + this.minSequenceId);
+    // Use maximum of log sequenceid or that which was found in stores
+    // (particularly if no recovered edits, seqid will be -1).
+    long nextSeqid = Math.max(seqid, maxSeqId) + 1;
+    LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
+    return nextSeqid;
   }
 
   /*
@@ -417,14 +420,6 @@ public class HRegion implements HConstan
     }
   }
 
-  /**
-   * @return Updates to this region need to have a sequence id that is >= to
-   * the this number.
-   */
-  long getMinSequenceId() {
-    return this.minSequenceId;
-  }
-
   /** @return a HRegionInfo object for this region */
   public HRegionInfo getRegionInfo() {
     return this.regionInfo;
@@ -607,7 +602,6 @@ public class HRegion implements HConstan
   }
 
   /** @return the last time the region was flushed */
-  @SuppressWarnings({"UnusedDeclaration"})
   public long getLastFlushTime() {
     return this.lastFlushTime;
   }
@@ -703,10 +697,10 @@ public class HRegion implements HConstan
       // Create a region instance and then move the splits into place under
       // regionA and regionB.
       HRegion regionA =
-        HRegion.newHRegion(basedir, log, fs, conf, regionAInfo, null);
+        HRegion.newHRegion(tableDir, log, fs, conf, regionAInfo, null);
       moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir());
       HRegion regionB =
-        HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null);
+        HRegion.newHRegion(tableDir, log, fs, conf, regionBInfo, null);
       moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
 
       return new HRegion [] {regionA, regionB};
@@ -739,28 +733,25 @@ public class HRegion implements HConstan
   }
 
   /*
-   * @param dir
-   * @return compaction directory for the passed in <code>dir</code>
-   */
-  static Path getCompactionDir(final Path dir) {
-   return new Path(dir, HREGION_COMPACTIONDIR_NAME);
-  }
-
-  /*
    * Do preparation for pending compaction.
-   * Clean out any vestiges of previous failed compactions.
    * @throws IOException
    */
   private void doRegionCompactionPrep() throws IOException {
-    doRegionCompactionCleanup();
   }
 
   /*
-   * Removes the compaction directory for this Store.
-   * @throws IOException
+   * Removes the temporary directory for this Store.
    */
-  private void doRegionCompactionCleanup() throws IOException {
-    FSUtils.deleteDirectory(this.fs, this.regionCompactionDir);
+  private void cleanupTmpDir() throws IOException {
+    FSUtils.deleteDirectory(this.fs, getTmpDir());
+  }
+
+  /**
+   * Get the temporary diretory for this region. This directory
+   * will have its contents removed when the region is reopened.
+   */
+  Path getTmpDir() {
+    return new Path(getRegionDir(), ".tmp");
   }
 
   void setForceMajorCompaction(final boolean b) {
@@ -841,7 +832,6 @@ public class HRegion implements HConstan
             splitRow = ss.getSplitRow();
           }
         }
-        doRegionCompactionCleanup();
         String timeTaken = StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(),
             startTime);
         LOG.info("compaction completed on region " + this + " in " + timeTaken);
@@ -1074,7 +1064,7 @@ public class HRegion implements HConstan
 
     if (LOG.isDebugEnabled()) {
       long now = EnvironmentEdgeManager.currentTimeMillis();
-      LOG.debug("Finished memstore flush of ~" +
+      LOG.info("Finished memstore flush of ~" +
         StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
         this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId +
         ", compaction requested=" + compactionRequested);
@@ -1239,7 +1229,7 @@ public class HRegion implements HConstan
     try {
       byte [] row = delete.getRow();
       // If we did not pass an existing row lock, obtain a new one
-      lid = getLock(lockid, row);
+      lid = getLock(lockid, row, true);
 
       // All edits for the given row (across all column families) must happen atomically.
       prepareDelete(delete);
@@ -1264,7 +1254,6 @@ public class HRegion implements HConstan
     boolean flush = false;
 
     updatesLock.readLock().lock();
-    ReadWriteConsistencyControl.WriteEntry w = null;
 
     try {
 
@@ -1274,7 +1263,6 @@ public class HRegion implements HConstan
         List<KeyValue> kvs = e.getValue();
         Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
 
-        Store store = getStore(family);
         for (KeyValue kv: kvs) {
           //  Check if time is LATEST, change to time of most recent addition if so
           //  This is expensive.
@@ -1314,50 +1302,24 @@ public class HRegion implements HConstan
       }
 
       if (writeToWAL) {
-        //
         // write/sync to WAL should happen before we touch memstore.
         //
         // If order is reversed, i.e. we write to memstore first, and
         // for some reason fail to write/sync to commit log, the memstore
         // will contain uncommitted transactions.
         //
-
         // bunch up all edits across all column families into a
         // single WALEdit.
         WALEdit walEdit = new WALEdit();
-        for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
-          List<KeyValue> kvs = e.getValue();
-          for (KeyValue kv : kvs) {
-            walEdit.add(kv);
-          }
-        }
-        // append the edit to WAL. The append also does the sync.
-        if (!walEdit.isEmpty()) {
-          this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
+        addFamilyMapToWALEdit(familyMap, walEdit);
+        this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
             walEdit, now);
-        }
       }
 
       // Now make changes to the memstore.
-
-      long size = 0;
-      w = rwcc.beginMemstoreInsert();
-
-      for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
-
-        byte[] family = e.getKey();
-        List<KeyValue> kvs = e.getValue();
-
-        Store store = getStore(family);
-        for (KeyValue kv: kvs) {
-          kv.setMemstoreTS(w.getWriteNumber());
-          size = this.memstoreSize.addAndGet(store.delete(kv));
-        }
-      }
-      flush = isFlushSize(size);
+      long addedSize = applyFamilyMapToMemstore(familyMap);
+      flush = isFlushSize(memstoreSize.addAndGet(addedSize));
     } finally {
-      if (w != null) rwcc.completeMemstoreInsert(w);
-
       this.updatesLock.readLock().unlock();
     }
 
@@ -1418,7 +1380,7 @@ public class HRegion implements HConstan
       // invokes a HRegion#abort.
       byte [] row = put.getRow();
       // If we did not pass an existing row lock, obtain a new one
-      Integer lid = getLock(lockid, row);
+      Integer lid = getLock(lockid, row, true);
 
       try {
         // All edits for the given row (across all column families) must happen atomically.
@@ -1431,6 +1393,171 @@ public class HRegion implements HConstan
     }
   }
 
+  /**
+   * Struct-like class that tracks the progress of a batch operation,
+   * accumulating status codes and tracking the index at which processing
+   * is proceeding.
+   */
+  private static class BatchOperationInProgress<T> {
+    T[] operations;
+    OperationStatusCode[] retCodes;
+    int nextIndexToProcess = 0;
+
+    public BatchOperationInProgress(T[] operations) {
+      this.operations = operations;
+      retCodes = new OperationStatusCode[operations.length];
+      Arrays.fill(retCodes, OperationStatusCode.NOT_RUN);
+    }
+
+    public boolean isDone() {
+      return nextIndexToProcess == operations.length;
+    }
+  }
+
+  /**
+   * Perform a batch put with no pre-specified locks
+   * @see HRegion#put(Pair[])
+   */
+  public OperationStatusCode[] put(Put[] puts) throws IOException {
+    @SuppressWarnings("unchecked")
+    Pair<Put, Integer> putsAndLocks[] = new Pair[puts.length];
+
+    for (int i = 0; i < puts.length; i++) {
+      putsAndLocks[i] = new Pair<Put, Integer>(puts[i], null);
+    }
+    return put(putsAndLocks);
+  }
+
+  /**
+   * Perform a batch of puts.
+   * @param putsAndLocks the list of puts paired with their requested lock IDs.
+   * @throws IOException
+   */
+  public OperationStatusCode[] put(Pair<Put, Integer>[] putsAndLocks) throws IOException {
+    BatchOperationInProgress<Pair<Put, Integer>> batchOp =
+      new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
+
+    while (!batchOp.isDone()) {
+      checkReadOnly();
+      checkResources();
+
+      long newSize;
+      splitsAndClosesLock.readLock().lock();
+      try {
+        long addedSize = doMiniBatchPut(batchOp);
+        newSize = memstoreSize.addAndGet(addedSize);
+      } finally {
+        splitsAndClosesLock.readLock().unlock();
+      }
+      if (isFlushSize(newSize)) {
+        requestFlush();
+      }
+    }
+    return batchOp.retCodes;
+  }
+
+  private long doMiniBatchPut(BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    byte[] byteNow = Bytes.toBytes(now);
+
+    /** Keep track of the locks we hold so we can release them in finally clause */
+    List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
+    // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
+    int firstIndex = batchOp.nextIndexToProcess;
+    int lastIndexExclusive = firstIndex;
+    boolean success = false;
+    try {
+      // ------------------------------------
+      // STEP 1. Try to acquire as many locks as we can, and ensure
+      // we acquire at least one.
+      // ----------------------------------
+      int numReadyToWrite = 0;
+      while (lastIndexExclusive < batchOp.operations.length) {
+        Pair<Put, Integer> nextPair = batchOp.operations[lastIndexExclusive];
+        Put put = nextPair.getFirst();
+        Integer providedLockId = nextPair.getSecond();
+
+        // Check the families in the put. If bad, skip this one.
+        try {
+          checkFamilies(put.getFamilyMap().keySet());
+        } catch (NoSuchColumnFamilyException nscf) {
+          LOG.warn("No such column family in batch put", nscf);
+          batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.BAD_FAMILY;
+          lastIndexExclusive++;
+          continue;
+        }
+
+        // If we haven't got any rows in our batch, we should block to
+        // get the next one.
+        boolean shouldBlock = numReadyToWrite == 0;
+        Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock);
+        if (acquiredLockId == null) {
+          // We failed to grab another lock
+          assert !shouldBlock : "Should never fail to get lock when blocking";
+          break; // stop acquiring more rows for this batch
+        }
+        if (providedLockId == null) {
+          acquiredLocks.add(acquiredLockId);
+        }
+        lastIndexExclusive++;
+        numReadyToWrite++;
+      }
+      // We've now grabbed as many puts off the list as we can
+      assert numReadyToWrite > 0;
+
+      // ------------------------------------
+      // STEP 2. Update any LATEST_TIMESTAMP timestamps
+      // ----------------------------------
+      for (int i = firstIndex; i < lastIndexExclusive; i++) {
+        updateKVTimestamps(
+            batchOp.operations[i].getFirst().getFamilyMap().values(),
+            byteNow);
+      }
+
+      // ------------------------------------
+      // STEP 3. Write to WAL
+      // ----------------------------------
+      WALEdit walEdit = new WALEdit();
+      for (int i = firstIndex; i < lastIndexExclusive; i++) {
+        // Skip puts that were determined to be invalid during preprocessing
+        if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
+
+        Put p = batchOp.operations[i].getFirst();
+        if (!p.getWriteToWAL()) continue;
+        addFamilyMapToWALEdit(p.getFamilyMap(), walEdit);
+      }
+
+      // Append the edit to WAL
+      this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
+          walEdit, now);
+
+      // ------------------------------------
+      // STEP 4. Write back to memstore
+      // ----------------------------------
+      long addedSize = 0;
+      for (int i = firstIndex; i < lastIndexExclusive; i++) {
+        if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
+
+        Put p = batchOp.operations[i].getFirst();
+        addedSize += applyFamilyMapToMemstore(p.getFamilyMap());
+        batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
+      }
+      success = true;
+      return addedSize;
+    } finally {
+      for (Integer toRelease : acquiredLocks) {
+        releaseRowLock(toRelease);
+      }
+      if (!success) {
+        for (int i = firstIndex; i < lastIndexExclusive; i++) {
+          if (batchOp.retCodes[i] == OperationStatusCode.NOT_RUN) {
+            batchOp.retCodes[i] = OperationStatusCode.FAILURE;
+          }
+        }
+      }
+      batchOp.nextIndexToProcess = lastIndexExclusive;
+    }
+  }
 
   //TODO, Think that gets/puts and deletes should be refactored a bit so that
   //the getting of the lock happens before, so that you would just pass it into
@@ -1466,7 +1593,7 @@ public class HRegion implements HConstan
       get.addColumn(family, qualifier);
 
       // Lock row
-      Integer lid = getLock(lockId, get.getRow());
+      Integer lid = getLock(lockId, get.getRow(), true);
       List<KeyValue> result = new ArrayList<KeyValue>();
       try {
         result = get(get);
@@ -1502,21 +1629,17 @@ public class HRegion implements HConstan
 
 
   /**
-   * Checks if any stamps is Long.MAX_VALUE.  If so, sets them to now.
-   * <p>
-   * This acts to replace LATEST_TIMESTAMP with now.
-   * @param keys
-   * @param now
-   * @return <code>true</code> when updating the time stamp completed.
+   * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP}
+   * with the provided current timestamp.
    */
-  private boolean updateKeys(List<KeyValue> keys, byte [] now) {
-    if (keys == null || keys.isEmpty()) {
-      return false;
-    }
-    for (KeyValue key : keys) {
-      key.updateLatestStamp(now);
+  private void updateKVTimestamps(
+      final Iterable<List<KeyValue>> keyLists, final byte[] now) {
+    for (List<KeyValue> keys: keyLists) {
+      if (keys == null) continue;
+      for (KeyValue key : keys) {
+        key.updateLatestStamp(now);
+      }
     }
-    return true;
   }
 
 //  /*
@@ -1618,51 +1741,47 @@ public class HRegion implements HConstan
     byte[] byteNow = Bytes.toBytes(now);
     boolean flush = false;
     this.updatesLock.readLock().lock();
-    ReadWriteConsistencyControl.WriteEntry w = null;
     try {
-      WALEdit walEdit = new WALEdit();
-
-      // check if column families are valid;
-      // check if any timestampupdates are needed;
-      // and if writeToWAL is set, then also collapse edits into a single list.
-      for (Map.Entry<byte[], List<KeyValue>> e: familyMap.entrySet()) {
-        List<KeyValue> edits = e.getValue();
-        byte[] family = e.getKey();
-
-        // is this a valid column family?
-        checkFamily(family);
-
-        // update timestamp on keys if required.
-        if (updateKeys(edits, byteNow)) {
-          if (writeToWAL) {
-            // bunch up all edits across all column families into a
-            // single WALEdit.
-            for (KeyValue kv : edits) {
-              walEdit.add(kv);
-            }
-          }
-        }
-      }
-
-      // append to and sync WAL
-      if (!walEdit.isEmpty()) {
-        //
-        // write/sync to WAL should happen before we touch memstore.
-        //
-        // If order is reversed, i.e. we write to memstore first, and
-        // for some reason fail to write/sync to commit log, the memstore
-        // will contain uncommitted transactions.
-        //
-
+      checkFamilies(familyMap.keySet());
+      updateKVTimestamps(familyMap.values(), byteNow);
+      // write/sync to WAL should happen before we touch memstore.
+      //
+      // If order is reversed, i.e. we write to memstore first, and
+      // for some reason fail to write/sync to commit log, the memstore
+      // will contain uncommitted transactions.
+      if (writeToWAL) {
+        WALEdit walEdit = new WALEdit();
+        addFamilyMapToWALEdit(familyMap, walEdit);
         this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
            walEdit, now);
       }
 
-      long size = 0;
+      long addedSize = applyFamilyMapToMemstore(familyMap);
+      flush = isFlushSize(memstoreSize.addAndGet(addedSize));
+    } finally {
+      this.updatesLock.readLock().unlock();
+    }
+    if (flush) {
+      // Request a cache flush.  Do it outside update lock.
+      requestFlush();
+    }
+  }
 
+  /**
+   * Atomically apply the given map of family->edits to the memstore.
+   * This handles the consistency control on its own, but the caller
+   * should already have locked updatesLock.readLock(). This also does
+   * <b>not</b> check the families for validity.
+   *
+   * @return the additional memory usage of the memstore caused by the
+   * new entries.
+   */
+  private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap) {
+    ReadWriteConsistencyControl.WriteEntry w = null;
+    long size = 0;
+    try {
       w = rwcc.beginMemstoreInsert();
 
-      // now make changes to the memstore
       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
         byte[] family = e.getKey();
         List<KeyValue> edits = e.getValue();
@@ -1670,18 +1789,38 @@ public class HRegion implements HConstan
         Store store = getStore(family);
         for (KeyValue kv: edits) {
           kv.setMemstoreTS(w.getWriteNumber());
-          size = this.memstoreSize.addAndGet(store.add(kv));
+          size += store.add(kv);
         }
       }
-      flush = isFlushSize(size);
     } finally {
-      if (w != null) rwcc.completeMemstoreInsert(w);
+      rwcc.completeMemstoreInsert(w);
+    }
+    return size;
+  }
 
-      this.updatesLock.readLock().unlock();
+  /**
+   * Check the collection of families for validity.
+   * @throws NoSuchColumnFamilyException if a family does not exist.
+   */
+  private void checkFamilies(Collection<byte[]> families)
+  throws NoSuchColumnFamilyException {
+    for (byte[] family : families) {
+      checkFamily(family);
     }
-    if (flush) {
-      // Request a cache flush.  Do it outside update lock.
-      requestFlush();
+  }
+
+  /**
+   * Append the given map of family->edits to a WALEdit data structure.
+   * This does not write to the HLog itself.
+   * @param familyMap map of family->edits
+   * @param walEdit the destination entry to append into
+   */
+  private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap,
+      WALEdit walEdit) {
+    for (List<KeyValue> edits : familyMap.values()) {
+      for (KeyValue kv : edits) {
+        walEdit.add(kv);
+      }
     }
   }
 
@@ -1710,19 +1849,199 @@ public class HRegion implements HConstan
     return size > this.memstoreFlushSize;
   }
 
-  // Do any reconstruction needed from the log
-  protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId,
-    Progressable reporter)
+  /**
+   * Read the edits log put under this region by wal log splitting process.  Put
+   * the recovered edits back up into this region.
+   *
+   * We can ignore any log message that has a sequence ID that's equal to or
+   * lower than minSeqId.  (Because we know such log messages are already
+   * reflected in the HFiles.)
+   * @param regiondir
+   * @param minSeqId Minimum sequenceid found in a store file.  Edits in log
+   * must be larger than this to be replayed.
+   * @param reporter
+   * @return the sequence id of the last edit added to this region out of the
+   * recovered edits log, or -1 if no log recovered
+   * @throws UnsupportedEncodingException
+   * @throws IOException
+   */
+  protected long replayRecoveredEditsIfAny(final Path regiondir,
+      final long minSeqId, final Progressable reporter)
   throws UnsupportedEncodingException, IOException {
-    // Nothing to do (Replaying is done in HStores)
-    // Used by subclasses; e.g. THBase.
+    Path edits = new Path(regiondir, HLog.RECOVERED_EDITS);
+    if (edits == null || !this.fs.exists(edits)) return -1;
+    if (isZeroLengthThenDelete(this.fs, edits)) return -1;
+    long maxSeqIdInLog = -1;
+    try {
+      maxSeqIdInLog = replayRecoveredEdits(edits, minSeqId, reporter);
+      LOG.debug("Deleting recovered edits file: " + edits);
+      if (!this.fs.delete(edits, false)) {
+        LOG.error("Failed delete of " + edits);
+      }
+    } catch (IOException e) {
+      boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
+      if (skipErrors) {
+        Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
+          System.currentTimeMillis());
+        LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
+          " as " + moveAsideName, e);
+        if (!this.fs.rename(edits, moveAsideName)) {
+          LOG.error("hbase.skip.errors=true so continuing. Rename failed");
+        }
+      } else {
+        throw e;
+      }
+    }
+    return maxSeqIdInLog;
+  }
+
+  /*
+   * @param edits File of recovered edits.
+   * @param minSeqId Minimum sequenceid found in a store file.  Edits in log
+   * must be larger than this to be replayed.
+   * @param reporter
+   * @return the sequence id of the last edit added to this region out of the
+   * recovered edits log, or -1 if no log recovered
+   * @throws IOException
+   */
+  private long replayRecoveredEdits(final Path edits,
+      final long minSeqId, final Progressable reporter)
+    throws IOException {
+    HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
+    try {
+      return replayRecoveredEdits(reader, minSeqId, reporter);
+    } finally {
+      reader.close();
+    }
+  }
+
+ /* @param reader Reader against file of recovered edits.
+  * @param minSeqId Minimum sequenceid found in a store file.  Edits in log
+  * must be larger than this to be replayed.
+  * @param reporter
+  * @return the sequence id of the last edit added to this region out of the
+  * recovered edits log, or -1 if no log recovered
+  * @throws IOException
+  */
+  private long replayRecoveredEdits(final HLog.Reader reader,
+    final long minSeqId, final Progressable reporter)
+  throws IOException {
+    long currentEditSeqId = -1;
+    long firstSeqIdInLog = -1;
+    long skippedEdits = 0;
+    long editsCount = 0;
+    HLog.Entry entry;
+    Store store = null;
+    // Get map of family name to maximum sequence id.  Do it here up front
+    // because as we progress, the sequence id can change if we happen to flush
+    // The flush ups the seqid for the Store.  The new seqid can cause us skip edits.
+    Map<byte [], Long> familyToOriginalMaxSeqId = familyToMaxSeqId(this.stores);
+    // How many edits to apply before we send a progress report.
+    int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
+    while ((entry = reader.next()) != null) {
+      HLogKey key = entry.getKey();
+      WALEdit val = entry.getEdit();
+      if (firstSeqIdInLog == -1) {
+        firstSeqIdInLog = key.getLogSeqNum();
+      }
+      // Now, figure if we should skip this edit.
+      currentEditSeqId = Math.max(currentEditSeqId, key.getLogSeqNum());
+      if (key.getLogSeqNum() <= minSeqId) {
+        skippedEdits++;
+        continue;
+      }
+      for (KeyValue kv : val.getKeyValues()) {
+        // Check this edit is for me. Also, guard against writing the special
+        // METACOLUMN info such as HBASE::CACHEFLUSH entries
+        if (kv.matchingFamily(HLog.METAFAMILY) ||
+            !Bytes.equals(key.getRegionName(), this.regionInfo.getRegionName())) {
+          skippedEdits++;
+          continue;
+        }
+        // Figure which store the edit is meant for.
+        if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
+          store = this.stores.get(kv.getFamily());
+        }
+        if (store == null) {
+          // This should never happen.  Perhaps schema was changed between
+          // crash and redeploy?
+          LOG.warn("No family for " + kv);
+          skippedEdits++;
+          continue;
+        }
+        // The edits' id has to be in excess of the original max seqid of the
+        // targeted store.
+        long storeMaxSeqId = familyToOriginalMaxSeqId.get(store.getFamily().getName());
+        if (currentEditSeqId < storeMaxSeqId) {
+          skippedEdits++;
+          continue;
+        }
+        restoreEdit(kv);
+        editsCount++;
+     }
+
+      // Every 'interval' edits, tell the reporter we're making progress.
+      // Have seen 60k edits taking 3minutes to complete.
+      if (reporter != null && (editsCount % interval) == 0) {
+        reporter.progress();
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
+        ", firstSeqIdInLog=" + firstSeqIdInLog +
+        ", maxSeqIdInLog=" + currentEditSeqId);
+    }
+    return currentEditSeqId;
+  }
+
+  /*
+   * @param stores
+   * @return Map of family name to maximum sequenceid.
+   */
+  private Map<byte [], Long> familyToMaxSeqId(final Map<byte [], Store> stores) {
+    Map<byte [], Long> map = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+    for (Map.Entry<byte [], Store> e: stores.entrySet()) {
+      map.put(e.getKey(), e.getValue().getMaxSequenceId());
+    }
+    return map;
+  }
+
+  /*
+   * @param kv Apply this value to this region.
+   * @throws IOException
+   */
+  // This method is protected so can be called from tests.
+  protected void restoreEdit(final KeyValue kv) throws IOException {
+    // This is really expensive to do per edit.  Loads of object creation.
+    // TODO: Optimization.  Apply edits batched by family.
+    Map<byte [], List<KeyValue>> map =
+      new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
+    map.put(kv.getFamily(), Collections.singletonList(kv));
+    if (kv.isDelete()) {
+      delete(map, true);
+    } else {
+      put(map, true);
+    }
   }
 
-  protected Store instantiateHStore(Path baseDir,
-    HColumnDescriptor c, Path oldLogFile, Progressable reporter)
+  /*
+   * @param fs
+   * @param p File to check.
+   * @return True if file was zero-length (and if so, we'll delete it in here).
+   * @throws IOException
+   */
+  private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
   throws IOException {
-    return new Store(baseDir, this, c, this.fs, oldLogFile,
-      this.conf, reporter);
+    FileStatus stat = fs.getFileStatus(p);
+    if (stat.getLen() > 0) return false;
+    LOG.warn("File " + p + " is zero-length, deleting.");
+    fs.delete(p, false);
+    return true;
+  }
+
+  protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
+  throws IOException {
+    return new Store(tableDir, this, c, this.fs, this.conf);
   }
 
   /**
@@ -1775,6 +2094,27 @@ public class HRegion implements HConstan
    * @return The id of the held lock.
    */
   public Integer obtainRowLock(final byte [] row) throws IOException {
+    return internalObtainRowLock(row, true);
+  }
+
+  /**
+   * Tries to obtain a row lock on the given row, but does not block if the
+   * row lock is not available. If the lock is not available, returns false.
+   * Otherwise behaves the same as the above method.
+   * @see HRegion#obtainRowLock(byte[])
+   */
+  public Integer tryObtainRowLock(final byte[] row) throws IOException {
+    return internalObtainRowLock(row, false);
+  }
+
+  /**
+   * Obtains or tries to obtain the given row lock.
+   * @param waitForLock if true, will block until the lock is available.
+   *        Otherwise, just tries to obtain the lock and returns
+   *        null if unavailable.
+   */
+  private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
+  throws IOException {
     checkRow(row);
     splitsAndClosesLock.readLock().lock();
     try {
@@ -1783,6 +2123,9 @@ public class HRegion implements HConstan
       }
       synchronized (lockedRows) {
         while (lockedRows.contains(row)) {
+          if (!waitForLock) {
+            return null;
+          }
           try {
             lockedRows.wait();
           } catch (InterruptedException ie) {
@@ -1843,7 +2186,7 @@ public class HRegion implements HConstan
    * @param lockid
    * @return boolean
    */
-  private boolean isRowLocked(final Integer lockid) {
+  boolean isRowLocked(final Integer lockid) {
     synchronized (lockedRows) {
       if (lockIds.get(lockid) != null) {
         return true;
@@ -1855,14 +2198,17 @@ public class HRegion implements HConstan
   /**
    * Returns existing row lock if found, otherwise
    * obtains a new row lock and returns it.
-   * @param lockid
-   * @return lockid
+   * @param lockid requested by the user, or null if the user didn't already hold lock
+   * @param row the row to lock
+   * @param waitForLock if true, will block until the lock is available, otherwise will
+   * simply return null if it could not acquire the lock.
+   * @return lockid or null if waitForLock is false and the lock was unavailable.
    */
-  private Integer getLock(Integer lockid, byte [] row)
+  private Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
   throws IOException {
     Integer lid = null;
     if (lockid == null) {
-      lid = obtainRowLock(row);
+      lid = internalObtainRowLock(row, waitForLock);
     } else {
       if (!isRowLocked(lockid)) {
         throw new IOException("Invalid row lock");
@@ -1923,8 +2269,8 @@ public class HRegion implements HConstan
   }
 
   /** @return Path of region base directory */
-  public Path getBaseDir() {
-    return this.basedir;
+  public Path getTableDir() {
+    return this.tableDir;
   }
 
   /**
@@ -2117,9 +2463,8 @@ public class HRegion implements HConstan
   // Utility methods
   /**
    * A utility method to create new instances of HRegion based on the
-   * {@link org.apache.hadoop.hbase.HConstants#REGION_IMPL} configuration
-   * property.
-   * @param basedir qualified path of directory where region should be located,
+   * {@link HConstants#REGION_IMPL} configuration property.
+   * @param tableDir qualified path of directory where region should be located,
    * usually the table directory.
    * @param log The HLog is the outbound log for any updates to the HRegion
    * (There's a single HLog for all the HRegions on a single HRegionServer.)
@@ -2137,7 +2482,7 @@ public class HRegion implements HConstan
    * failed.  Can be null.
    * @return the new instance
    */
-  public static HRegion newHRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
+  public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
                                    HRegionInfo regionInfo, FlushRequester flushListener) {
     try {
       @SuppressWarnings("unchecked")
@@ -2148,7 +2493,7 @@ public class HRegion implements HConstan
           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
               Configuration.class, HRegionInfo.class, FlushRequester.class);
 
-      return c.newInstance(basedir, log, fs, conf, regionInfo, flushListener);
+      return c.newInstance(tableDir, log, fs, conf, regionInfo, flushListener);
     } catch (Throwable e) {
       // todo: what should I throw here?
       throw new IllegalStateException("Could not instantiate a region instance.", e);
@@ -2177,10 +2522,10 @@ public class HRegion implements HConstan
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(regionDir);
     HRegion region = HRegion.newHRegion(tableDir,
-      new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME),
-          new Path(regionDir, HREGION_OLDLOGDIR_NAME), conf, null),
+      new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
+          new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf, null),
       fs, conf, info, null);
-    region.initialize(null, null);
+    region.initialize();
     return region;
   }
 
@@ -2209,9 +2554,9 @@ public class HRegion implements HConstan
     HRegion r = HRegion.newHRegion(
         HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()),
         log, FileSystem.get(conf), conf, info, null);
-    r.initialize(null, null);
+    long seqid = r.initialize();
     if (log != null) {
-      log.setSequenceNumber(r.getMinSequenceId());
+      log.setSequenceNumber(seqid);
     }
     return r;
   }
@@ -2230,12 +2575,14 @@ public class HRegion implements HConstan
   throws IOException {
     meta.checkResources();
     // The row key is the region name
-    byte [] row = r.getRegionName();
+    byte[] row = r.getRegionName();
     Integer lid = meta.obtainRowLock(row);
     try {
-      List<KeyValue> edits = new ArrayList<KeyValue>();
-      edits.add(new KeyValue(row, CATALOG_FAMILY, REGIONINFO_QUALIFIER,
-          EnvironmentEdgeManager.currentTimeMillis(), Writables.getBytes(r.getRegionInfo())));
+      final List<KeyValue> edits = new ArrayList<KeyValue>(1);
+      edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
+          HConstants.REGIONINFO_QUALIFIER,
+          EnvironmentEdgeManager.currentTimeMillis(),
+          Writables.getBytes(r.getRegionInfo())));
       meta.put(HConstants.CATALOG_FAMILY, edits);
     } finally {
       meta.releaseRowLock(lid);
@@ -2275,7 +2622,8 @@ public class HRegion implements HConstan
     byte [] row = info.getRegionName();
     Put put = new Put(row);
     info.setOffline(true);
-    put.add(CATALOG_FAMILY, REGIONINFO_QUALIFIER, Writables.getBytes(info));
+    put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+        Writables.getBytes(info));
     srvr.put(metaRegionName, put);
     cleanRegionInMETA(srvr, metaRegionName, info);
   }
@@ -2292,8 +2640,9 @@ public class HRegion implements HConstan
     final byte [] metaRegionName, final HRegionInfo info)
   throws IOException {
     Delete del = new Delete(info.getRegionName());
-    del.deleteColumns(CATALOG_FAMILY, SERVER_QUALIFIER);
-    del.deleteColumns(CATALOG_FAMILY, STARTCODE_QUALIFIER);
+    del.deleteColumns(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
+    del.deleteColumns(HConstants.CATALOG_FAMILY,
+        HConstants.STARTCODE_QUALIFIER);
     srvr.delete(metaRegionName, del);
   }
 
@@ -2446,31 +2795,36 @@ public class HRegion implements HConstan
     Configuration conf = a.getConf();
     HTableDescriptor tabledesc = a.getTableDesc();
     HLog log = a.getLog();
-    Path basedir = a.getBaseDir();
+    Path tableDir = a.getTableDir();
     // Presume both are of same region type -- i.e. both user or catalog
     // table regions.  This way can use comparator.
-    final byte [] startKey = a.comparator.matchingRows(a.getStartKey(), 0,
-          a.getStartKey().length,
-        EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length) ||
-      b.comparator.matchingRows(b.getStartKey(), 0, b.getStartKey().length,
-        EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length)?
-        EMPTY_BYTE_ARRAY:
-          a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
-          b.getStartKey(), 0, b.getStartKey().length) <= 0?
-        a.getStartKey(): b.getStartKey();
-    final byte [] endKey = a.comparator.matchingRows(a.getEndKey(), 0,
-        a.getEndKey().length, EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length) ||
-      a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
-        EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length)?
-        EMPTY_BYTE_ARRAY:
-        a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
-            b.getEndKey(), 0, b.getEndKey().length) <= 0?
-                b.getEndKey(): a.getEndKey();
+    final byte[] startKey =
+      (a.comparator.matchingRows(a.getStartKey(), 0, a.getStartKey().length,
+           HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
+       || b.comparator.matchingRows(b.getStartKey(), 0,
+              b.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
+              HConstants.EMPTY_BYTE_ARRAY.length))
+      ? HConstants.EMPTY_BYTE_ARRAY
+      : (a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
+             b.getStartKey(), 0, b.getStartKey().length) <= 0
+         ? a.getStartKey()
+         : b.getStartKey());
+    final byte[] endKey =
+      (a.comparator.matchingRows(a.getEndKey(), 0, a.getEndKey().length,
+           HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
+       || a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
+              HConstants.EMPTY_BYTE_ARRAY, 0,
+              HConstants.EMPTY_BYTE_ARRAY.length))
+      ? HConstants.EMPTY_BYTE_ARRAY
+      : (a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
+             b.getEndKey(), 0, b.getEndKey().length) <= 0
+         ? b.getEndKey()
+         : a.getEndKey());
 
     HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey);
     LOG.info("Creating new region " + newRegionInfo.toString());
     String encodedName = newRegionInfo.getEncodedName();
-    Path newRegionDir = HRegion.getRegionDir(a.getBaseDir(), encodedName);
+    Path newRegionDir = HRegion.getRegionDir(a.getTableDir(), encodedName);
     if(fs.exists(newRegionDir)) {
       throw new IOException("Cannot merge; target file collision at " +
           newRegionDir);
@@ -2489,7 +2843,7 @@ public class HRegion implements HConstan
     byFamily = filesByFamily(byFamily, b.close());
     for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
       byte [] colFamily = es.getKey();
-      makeColumnFamilyDirs(fs, basedir, newRegionInfo, colFamily);
+      makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
       // Because we compacted the source regions we should have no more than two
       // HStoreFiles per family and there will be no reference store
       List<StoreFile> srcFiles = es.getValue();
@@ -2505,7 +2859,7 @@ public class HRegion implements HConstan
       }
       for (StoreFile hsf: srcFiles) {
         StoreFile.rename(fs, hsf.getPath(),
-          StoreFile.getUniqueFile(fs, Store.getStoreHomedir(basedir,
+          StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir,
             newRegionInfo.getEncodedName(), colFamily)));
       }
     }
@@ -2513,8 +2867,8 @@ public class HRegion implements HConstan
       LOG.debug("Files for new region");
       listPaths(fs, newRegionDir);
     }
-    HRegion dstRegion = HRegion.newHRegion(basedir, log, fs, conf, newRegionInfo, null);
-    dstRegion.initialize(null, null);
+    HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf, newRegionInfo, null);
+    dstRegion.initialize();
     dstRegion.compactStores();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Files for new region");
@@ -2717,8 +3071,8 @@ public class HRegion implements HConstan
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.align(
-      (5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
-      (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
+      (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
+      (20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
@@ -2784,7 +3138,7 @@ public class HRegion implements HConstan
       throw new IOException("Not a known catalog table: " + p.toString());
     }
     try {
-      region.initialize(null, null);
+      region.initialize();
       if (majorCompact) {
         region.compactStores(true);
       } else {
@@ -2883,13 +3237,15 @@ public class HRegion implements HConstan
       }
       majorCompact = true;
     }
-    Path tableDir  = new Path(args[0]);
-    Configuration c = HBaseConfiguration.create();
-    FileSystem fs = FileSystem.get(c);
-    Path logdir = new Path(c.get("hbase.tmp.dir"),
-      "hlog" + tableDir.getName() + EnvironmentEdgeManager.currentTimeMillis());
-    Path oldLogDir = new Path(c.get("hbase.tmp.dir"), HREGION_OLDLOGDIR_NAME);
-    HLog log = new HLog(fs, logdir, oldLogDir, c, null);
+    final Path tableDir = new Path(args[0]);
+    final Configuration c = HBaseConfiguration.create();
+    final FileSystem fs = FileSystem.get(c);
+    final Path logdir = new Path(c.get("hbase.tmp.dir"),
+        "hlog" + tableDir.getName()
+        + EnvironmentEdgeManager.currentTimeMillis());
+    final Path oldLogDir = new Path(c.get("hbase.tmp.dir"),
+        HConstants.HREGION_OLDLOGDIR_NAME);
+    final HLog log = new HLog(fs, logdir, oldLogDir, c, null);
     try {
       processTable(fs, tableDir, log, c, majorCompact);
      } finally {

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1176177&r1=1176176&r2=1176177&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Sep 27 02:41:56 2011
@@ -19,6 +19,37 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
+import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Constructor;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -28,7 +59,6 @@ import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HMsg;
-import org.apache.hadoop.hbase.HMsg.Type;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HServerAddress;
@@ -38,12 +68,15 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.LeaseListener;
 import org.apache.hadoop.hbase.Leases;
-import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.LocalHBaseCluster;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.UnknownRowLockException;
 import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HMsg.Type;
+import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.MultiPut;
@@ -62,9 +95,11 @@ import org.apache.hadoop.hbase.ipc.HMast
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
@@ -78,43 +113,12 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
-import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryUsage;
-import java.lang.management.RuntimeMXBean;
-import java.lang.reflect.Constructor;
-import java.net.BindException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 /**
  * HRegionServer makes a set of HRegions available to clients.  It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  */
-public class HRegionServer implements HConstants, HRegionInterface,
-    HBaseRPCErrorHandler, Runnable, Watcher {
+public class HRegionServer implements HRegionInterface,
+    HBaseRPCErrorHandler, Runnable, Watcher, Stoppable {
   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
   private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
   private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
@@ -233,6 +237,10 @@ public class HRegionServer implements HC
 
   private final String machineName;
 
+  // Replication-related attributes
+  private Replication replicationHandler;
+  // End of replication
+
   /**
    * Starts a HRegionServer at the default location
    * @param conf
@@ -243,7 +251,8 @@ public class HRegionServer implements HC
         conf.get("hbase.regionserver.dns.interface","default"),
         conf.get("hbase.regionserver.dns.nameserver","default"));
     String addressStr = machineName + ":" +
-      conf.get(REGIONSERVER_PORT, Integer.toString(DEFAULT_REGIONSERVER_PORT));
+      conf.get(HConstants.REGIONSERVER_PORT,
+          Integer.toString(HConstants.DEFAULT_REGIONSERVER_PORT));
     // This is not necessarily the address we will run with.  The address we
     // use will be in #serverInfo data member.  For example, we may have been
     // passed a port of 0 which means we should pick some ephemeral port to bind
@@ -260,7 +269,8 @@ public class HRegionServer implements HC
 
     // Config'ed params
     this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
-    this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
+        10 * 1000);
     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 1 * 1000);
 
     sleeper = new Sleeper(this.msgInterval, this.stopRequested);
@@ -275,7 +285,9 @@ public class HRegionServer implements HC
     this.numRegionsToReport =
       conf.getInt("hbase.regionserver.numregionstoreport", 10);
 
-    this.rpcTimeout = conf.getLong(HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
+    this.rpcTimeout =
+      conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
+          HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
 
     reinitialize();
   }
@@ -309,12 +321,14 @@ public class HRegionServer implements HC
     reinitializeZooKeeper();
     int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
     for(int i = 0; i < nbBlocks; i++)  {
-      reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
+      reservedSpace.add(new byte[HConstants.DEFAULT_SIZE_RESERVATION_BLOCK]);
     }
   }
 
   private void reinitializeZooKeeper() throws IOException {
-    zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
+    zooKeeperWrapper =
+        ZooKeeperWrapper.createInstance(conf, serverInfo.getServerName());
+    zooKeeperWrapper.registerListener(this);
     watchMasterAddress();
   }
 
@@ -332,13 +346,14 @@ public class HRegionServer implements HC
 
     // Background thread to check for major compactions; needed if region
     // has not gotten updates in a while.  Make it run at a lesser frequency.
-    int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY +
+    int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY +
         ".multiplier", 1000);
     this.majorCompactionChecker = new MajorCompactionChecker(this,
       this.threadWakeFrequency * multiplier,  this.stopRequested);
 
     this.leases = new Leases(
-        (int) conf.getLong(HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD),
+        (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
+            HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD),
         this.threadWakeFrequency);
   }
 
@@ -368,7 +383,7 @@ public class HRegionServer implements HC
       if (restart) {
         restart();
       } else {
-        abort();
+        abort("ZooKeeper session expired");
       }
     } else if (type == EventType.NodeDeleted) {
       watchMasterAddress();
@@ -388,8 +403,7 @@ public class HRegionServer implements HC
   }
 
   private void restart() {
-    LOG.info("Restarting Region Server");
-    abort();
+    abort("Restarting region server");
     Threads.shutdown(regionServerThread);
     boolean done = false;
     while (!done) {
@@ -524,9 +538,15 @@ public class HRegionServer implements HC
               continue;
             }
           } catch (Exception e) { // FindBugs REC_CATCH_EXCEPTION
+            // Two special exceptions could be printed out here,
+            // PleaseHoldException and YouAreDeadException
             if (e instanceof IOException) {
               e = RemoteExceptionHandler.checkIOException((IOException) e);
             }
+            if (e instanceof YouAreDeadException) {
+              // This will be caught and handled as a fatal error below
+              throw e;
+            }
             tries++;
             if (tries > 0 && (tries % this.numRetries) == 0) {
               // Check filesystem every so often.
@@ -553,8 +573,7 @@ public class HRegionServer implements HC
       } // for
     } catch (Throwable t) {
       if (!checkOOME(t)) {
-        LOG.fatal("Unhandled exception. Aborting...", t);
-        abort();
+        abort("Unhandled exception", t);
       }
     }
     this.leases.closeAfterLeasesExpire();
@@ -711,7 +730,7 @@ public class HRegionServer implements HC
       // accessors will be going against wrong filesystem (unless all is set
       // to defaults).
       this.conf.set("fs.defaultFS", this.conf.get("hbase.rootdir"));
-      this.conf.setBoolean("fs.automatic.close", false);
+      // Get fs instance used by this RS
       this.fs = FileSystem.get(this.conf);
       this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
       this.hlog = setupHLog();
@@ -821,8 +840,7 @@ public class HRegionServer implements HC
       (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) ||
       (e.getMessage() != null &&
         e.getMessage().contains("java.lang.OutOfMemoryError"))) {
-      LOG.fatal("OutOfMemoryError, aborting.", e);
-      abort();
+      abort("OutOfMemoryError, aborting", e);
       stop = true;
     }
     return stop;
@@ -840,8 +858,7 @@ public class HRegionServer implements HC
       try {
         FSUtils.checkFileSystemAvailable(this.fs);
       } catch (IOException e) {
-        LOG.fatal("Shutting down HRegionServer: file system not available", e);
-        abort();
+        abort("File System not available", e);
         this.fsOk = false;
       }
     }
@@ -857,7 +874,7 @@ public class HRegionServer implements HC
 
     MajorCompactionChecker(final HRegionServer h,
         final int sleepTime, final AtomicBoolean stopper) {
-      super(sleepTime, stopper);
+      super("MajorCompactionChecker", sleepTime, stopper);
       this.instance = h;
       LOG.info("Runs every " + sleepTime + "ms");
     }
@@ -891,7 +908,7 @@ public class HRegionServer implements HC
   }
 
   private HLog setupHLog() throws IOException {
-    Path oldLogDir = new Path(rootDir, HREGION_OLDLOGDIR_NAME);
+    final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
     Path logdir = new Path(rootDir, HLog.getHLogDirectoryName(this.serverInfo));
     if (LOG.isDebugEnabled()) {
       LOG.debug("Log dir " + logdir);
@@ -901,15 +918,18 @@ public class HRegionServer implements HC
         "running at " + this.serverInfo.getServerName() +
         " because logdir " + logdir.toString() + " exists");
     }
-    HLog newlog = instantiateHLog(logdir, oldLogDir);
-    return newlog;
+    this.replicationHandler = new Replication(this.conf,this.serverInfo,
+        this.fs, oldLogDir, stopRequested);
+    HLog log = instantiateHLog(logdir, oldLogDir);
+    this.replicationHandler.addLogEntryVisitor(log);
+    return log;
   }
 
   // instantiate
   protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
-    HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller, null,
-        serverInfo.getServerAddress().toString());
-    return newlog;
+    return new HLog(this.fs, logdir, oldLogDir, this.conf, this.hlogRoller,
+      this.replicationHandler.getReplicationManager(),
+        this.serverInfo.getServerAddress().toString());
   }
 
 
@@ -993,8 +1013,7 @@ public class HRegionServer implements HC
     String n = Thread.currentThread().getName();
     UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
       public void uncaughtException(Thread t, Throwable e) {
-        abort();
-        LOG.fatal("Set stop flag in " + t.getName(), e);
+        abort("Uncaught exception in service thread " + t.getName(), e);
       }
     };
     Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
@@ -1035,12 +1054,14 @@ public class HRegionServer implements HC
           port++;
           // update HRS server info port.
           this.serverInfo = new HServerInfo(this.serverInfo.getServerAddress(),
-            this.serverInfo.getStartCode(),  port,
+            this.serverInfo.getStartCode(), port,
             this.serverInfo.getHostname());
         }
       }
     }
 
+    this.replicationHandler.startReplicationServices();
+
     // Start Server.  This service is like leases in that it internally runs
     // a thread.
     this.server.start();
@@ -1117,16 +1138,30 @@ public class HRegionServer implements HC
    * log it is using and without notifying the master.
    * Used unit testing and on catastrophic events such as HDFS is yanked out
    * from under hbase or we OOME.
+   * @param reason the reason we are aborting
+   * @param cause the exception that caused the abort, or null
    */
-  public void abort() {
+  public void abort(String reason, Throwable cause) {
+    if (cause != null) {
+      LOG.fatal("Aborting region server " + this + ": " + reason, cause);
+    } else {
+      LOG.fatal("Aborting region server " + this + ": " + reason);
+    }
     this.abortRequested = true;
     this.reservedSpace.clear();
     if (this.metrics != null) {
-      LOG.info("Dump of metrics: " + this.metrics.toString());
+      LOG.info("Dump of metrics: " + this.metrics);
     }
     stop();
   }
 
+  /**
+   * @see HRegionServer#abort(String, Throwable)
+   */
+  public void abort(String reason) {
+    abort(reason, null);
+  }
+
   /*
    * Simulate a kill -9 of this server.
    * Exits w/o closing regions or cleaninup logs but it does close socket in
@@ -1134,7 +1169,7 @@ public class HRegionServer implements HC
    */
   protected void kill() {
     this.killed = true;
-    abort();
+    abort("Simulated kill");
   }
 
   /**
@@ -1147,6 +1182,7 @@ public class HRegionServer implements HC
     Threads.shutdown(this.cacheFlusher);
     Threads.shutdown(this.compactSplitThread);
     Threads.shutdown(this.hlogRoller);
+    this.replicationHandler.join();
   }
 
   private boolean getMaster() {
@@ -1205,14 +1241,7 @@ public class HRegionServer implements HC
         if (LOG.isDebugEnabled())
           LOG.debug("sending initial server load: " + hsl);
         lastMsg = System.currentTimeMillis();
-        boolean startCodeOk = false;
-        while(!startCodeOk) {
-          this.serverInfo = createServerInfoWithNewStartCode(this.serverInfo);
-          startCodeOk = zooKeeperWrapper.writeRSLocation(this.serverInfo);
-          if(!startCodeOk) {
-           LOG.debug("Start code already taken, trying another one");
-          }
-        }
+        zooKeeperWrapper.writeRSLocation(this.serverInfo);
         result = this.hbaseMaster.regionServerStartup(this.serverInfo);
         break;
       } catch (IOException e) {
@@ -1223,26 +1252,6 @@ public class HRegionServer implements HC
     return result;
   }
 
-  private HServerInfo createServerInfoWithNewStartCode(final HServerInfo hsi) {
-    return new HServerInfo(hsi.getServerAddress(), hsi.getInfoPort(),
-      hsi.getHostname());
-  }
-
-  /* Add to the outbound message buffer */
-  private void reportOpen(HRegionInfo region) {
-    this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
-  }
-
-  /* Add to the outbound message buffer */
-  private void reportClose(HRegionInfo region) {
-    reportClose(region, null);
-  }
-
-  /* Add to the outbound message buffer */
-  private void reportClose(final HRegionInfo region, final byte[] message) {
-    this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message));
-  }
-
   /**
    * Add to the outbound message buffer
    *
@@ -1407,9 +1416,13 @@ public class HRegionServer implements HC
   void openRegion(final HRegionInfo regionInfo) {
     Integer mapKey = Bytes.mapKey(regionInfo.getRegionName());
     HRegion region = this.onlineRegions.get(mapKey);
+    RSZookeeperUpdater zkUpdater =
+      new RSZookeeperUpdater(conf, serverInfo.getServerName(),
+          regionInfo.getEncodedName());
     if (region == null) {
       try {
-        region = instantiateRegion(regionInfo);
+        zkUpdater.startRegionOpenEvent(null, true);
+        region = instantiateRegion(regionInfo, this.hlog);
         // Startup a compaction early if one is needed, if region has references
         // or if a store has too many store files
         if (region.hasReferences() || region.hasTooManyStoreFiles()) {
@@ -1423,30 +1436,54 @@ public class HRegionServer implements HC
         // TODO: add an extra field in HRegionInfo to indicate that there is
         // an error. We can't do that now because that would be an incompatible
         // change that would require a migration
-        reportClose(regionInfo, StringUtils.stringifyException(t).getBytes());
+        try {
+          HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE,
+                               regionInfo,
+                               StringUtils.stringifyException(t).getBytes());
+          zkUpdater.abortOpenRegion(hmsg);
+        } catch (IOException e1) {
+          // TODO: Can we recover? Should be throw RTE?
+          LOG.error("Failed to abort open region " + regionInfo.getRegionNameAsString(), e1);
+        }
         return;
       }
       this.lock.writeLock().lock();
       try {
-        this.hlog.setSequenceNumber(region.getMinSequenceId());
         this.onlineRegions.put(mapKey, region);
       } finally {
         this.lock.writeLock().unlock();
       }
     }
-    reportOpen(regionInfo);
+    try {
+      HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_OPEN, regionInfo);
+      zkUpdater.finishRegionOpenEvent(hmsg);
+    } catch (IOException e) {
+      LOG.error("Failed to mark region " + regionInfo.getRegionNameAsString() + " as opened", e);
+    }
   }
 
-  protected HRegion instantiateRegion(final HRegionInfo regionInfo)
-      throws IOException {
-    HRegion r = HRegion.newHRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
-        .getTableDesc().getName()), this.hlog, this.fs, conf, regionInfo,
-        this.cacheFlusher);
-    r.initialize(null,  new Progressable() {
+  /*
+   * @param regionInfo RegionInfo for the Region we're to instantiate and
+   * initialize.
+   * @param wal Set into here the regions' seqid.
+   * @return
+   * @throws IOException
+   */
+  protected HRegion instantiateRegion(final HRegionInfo regionInfo, final HLog wal)
+  throws IOException {
+    Path dir =
+      HTableDescriptor.getTableDir(rootDir, regionInfo.getTableDesc().getName());
+    HRegion r = HRegion.newHRegion(dir, this.hlog, this.fs, conf, regionInfo,
+      this.cacheFlusher);
+    long seqid = r.initialize(new Progressable() {
       public void progress() {
         addProcessingMessage(regionInfo);
       }
     });
+    // If a wal and its seqid is < that of new region, use new regions seqid.
+    if (wal != null) {
+      if (seqid > wal.getSequenceNumber()) wal.setSequenceNumber(seqid);
+    }
     return r;
   }
 
@@ -1463,11 +1500,20 @@ public class HRegionServer implements HC
 
   protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
   throws IOException {
+    RSZookeeperUpdater zkUpdater = null;
+    if(reportWhenCompleted) {
+      zkUpdater = new RSZookeeperUpdater(conf,
+          serverInfo.getServerName(), hri.getEncodedName());
+      zkUpdater.startRegionCloseEvent(null, false);
+    }
     HRegion region = this.removeFromOnlineRegions(hri);
     if (region != null) {
       region.close();
       if(reportWhenCompleted) {
-        reportClose(hri);
+        if(zkUpdater != null) {
+          HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, hri, null);
+          zkUpdater.finishRegionCloseEvent(hmsg);
+        }
       }
     }
   }
@@ -1646,36 +1692,35 @@ public class HRegionServer implements HC
     }
   }
 
-  public int put(final byte[] regionName, final Put [] puts)
+  public int put(final byte[] regionName, final List<Put> puts)
   throws IOException {
-    // Count of Puts processed.
-    int i = 0;
     checkOpen();
     HRegion region = null;
-    boolean writeToWAL = true;
     try {
       region = getRegion(regionName);
       if (!region.getRegionInfo().isMetaTable()) {
         this.cacheFlusher.reclaimMemStoreMemory();
       }
-      for (i = 0; i < puts.length; i++) {
-        this.requestCount.incrementAndGet();
-        Integer lock = getLockFromId(puts[i].getLockId());
-        writeToWAL &= puts[i].getWriteToWAL();
-        region.put(puts[i], lock);
-      }
 
-    } catch (WrongRegionException ex) {
-      LOG.debug("Batch puts: " + i, ex);
-      return i;
-    } catch (NotServingRegionException ex) {
-      LOG.debug("Batch puts interrupted at index=" + i + " because:" +
-        ex.getMessage());
-      return i;
+      @SuppressWarnings("unchecked")
+      Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
+
+      int i = 0;
+      for (Put p : puts) {
+        Integer lock = getLockFromId(p.getLockId());
+        putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
+      }
+
+      this.requestCount.addAndGet(puts.size());
+      OperationStatusCode[] codes = region.put(putsWithLocks);
+      for (i = 0; i < codes.length; i++) {
+        if (codes[i] != OperationStatusCode.SUCCESS)
+          return i;
+      }
+      return -1;
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
-    return -1;
   }
 
   private boolean checkAndMutate(final byte[] regionName, final byte [] row,
@@ -1885,7 +1930,7 @@ public class HRegionServer implements HC
     }
   }
 
-  public int delete(final byte[] regionName, final Delete [] deletes)
+  public int delete(final byte[] regionName, final List<Delete> deletes)
   throws IOException {
     // Count of Deletes processed.
     int i = 0;
@@ -1897,11 +1942,13 @@ public class HRegionServer implements HC
       if (!region.getRegionInfo().isMetaTable()) {
         this.cacheFlusher.reclaimMemStoreMemory();
       }
-      Integer[] locks = new Integer[deletes.length];
-      for (i = 0; i < deletes.length; i++) {
+      int size = deletes.size();
+      Integer[] locks = new Integer[size];
+      for (Delete delete: deletes) {
         this.requestCount.incrementAndGet();
-        locks[i] = getLockFromId(deletes[i].getLockId());
-        region.delete(deletes[i], locks[i], writeToWAL);
+        locks[i] = getLockFromId(delete.getLockId());
+        region.delete(delete, locks[i], writeToWAL);
+        i++;
       }
     } catch (WrongRegionException ex) {
       LOG.debug("Batch deletes: " + i, ex);
@@ -2322,8 +2369,8 @@ public class HRegionServer implements HC
     MultiPutResponse resp = new MultiPutResponse();
 
     // do each region as it's own.
-    for( Map.Entry<byte[],List<Put>> e: puts.puts.entrySet()) {
-      int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
+    for( Map.Entry<byte[], List<Put>> e: puts.puts.entrySet()) {
+      int result = put(e.getKey(), e.getValue());
       resp.addResult(e.getKey(), result);
 
       e.getValue().clear(); // clear some RAM
@@ -2351,8 +2398,10 @@ public class HRegionServer implements HC
   /**
    * @param hrs
    * @return Thread the RegionServer is running in correctly named.
+   * @throws IOException
    */
-  public static Thread startRegionServer(final HRegionServer hrs) {
+  public static Thread startRegionServer(final HRegionServer hrs)
+  throws IOException {
     return startRegionServer(hrs,
       "regionserver" + hrs.getServerInfo().getServerAddress().getPort());
   }
@@ -2361,12 +2410,18 @@ public class HRegionServer implements HC
    * @param hrs
    * @param name
    * @return Thread the RegionServer is running in correctly named.
+   * @throws IOException
    */
   public static Thread startRegionServer(final HRegionServer hrs,
-      final String name) {
+      final String name)
+  throws IOException {
     Thread t = new Thread(hrs);
     t.setName(name);
     t.start();
+    // Install shutdown hook that will catch signals and run an orderly shutdown
+    // of the hrs.
+    ShutdownHook.install(hrs.getConfiguration(),
+      FileSystem.get(hrs.getConfiguration()), hrs, t);
     return t;
   }
 
@@ -2400,6 +2455,11 @@ public class HRegionServer implements HC
     }
   }
 
+  @Override
+  public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+    this.replicationHandler.replicateLogEntries(entries);
+  }
+
   /**
    * Do class main.
    * @param args
@@ -2459,5 +2519,4 @@ public class HRegionServer implements HC
         HRegionServer.class);
     doMain(args, regionServerClass);
   }
-
-}
+}
\ No newline at end of file

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=1176177&r1=1176176&r2=1176177&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Tue Sep 27 02:41:56 2011
@@ -41,7 +41,7 @@ import java.util.PriorityQueue;
  * as an InternalScanner at the Store level, you will get runtime exceptions.
  */
 public class KeyValueHeap implements KeyValueScanner, InternalScanner {
-  private PriorityQueue<KeyValueScanner> heap;
+  private PriorityQueue<KeyValueScanner> heap = null;
   private KeyValueScanner current = null;
   private KVScannerComparator comparator;
 
@@ -51,22 +51,25 @@ public class KeyValueHeap implements Key
    * @param scanners
    * @param comparator
    */
-  public KeyValueHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator) {
+  public KeyValueHeap(List<? extends KeyValueScanner> scanners,
+      KVComparator comparator) {
     this.comparator = new KVScannerComparator(comparator);
-    this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
-        this.comparator);
-    for (KeyValueScanner scanner : scanners) {
-      if (scanner.peek() != null) {
-        this.heap.add(scanner);
-      } else {
-        scanner.close();
+    if (!scanners.isEmpty()) {
+      this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
+          this.comparator);
+      for (KeyValueScanner scanner : scanners) {
+        if (scanner.peek() != null) {
+          this.heap.add(scanner);
+        } else {
+          scanner.close();
+        }
       }
+      this.current = heap.poll();
     }
-    this.current = heap.poll();
   }
 
   public KeyValue peek() {
-    if(this.current == null) {
+    if (this.current == null) {
       return null;
     }
     return this.current.peek();
@@ -78,12 +81,12 @@ public class KeyValueHeap implements Key
     }
     KeyValue kvReturn = this.current.next();
     KeyValue kvNext = this.current.peek();
-    if(kvNext == null) {
+    if (kvNext == null) {
       this.current.close();
       this.current = this.heap.poll();
     } else {
       KeyValueScanner topScanner = this.heap.peek();
-      if(topScanner == null ||
+      if (topScanner == null ||
           this.comparator.compare(kvNext, topScanner.peek()) > 0) {
         this.heap.add(this.current);
         this.current = this.heap.poll();
@@ -104,6 +107,9 @@ public class KeyValueHeap implements Key
    * @return true if there are more keys, false if all scanners are done
    */
   public boolean next(List<KeyValue> result, int limit) throws IOException {
+    if (this.current == null) {
+      return false;
+    }
     InternalScanner currentAsInternal = (InternalScanner)this.current;
     currentAsInternal.next(result, limit);
     KeyValue pee = this.current.peek();
@@ -160,12 +166,14 @@ public class KeyValueHeap implements Key
   }
 
   public void close() {
-    if(this.current != null) {
+    if (this.current != null) {
       this.current.close();
     }
-    KeyValueScanner scanner;
-    while((scanner = this.heap.poll()) != null) {
-      scanner.close();
+    if (this.heap != null) {
+      KeyValueScanner scanner;
+      while ((scanner = this.heap.poll()) != null) {
+        scanner.close();
+      }
     }
   }
 
@@ -181,7 +189,7 @@ public class KeyValueHeap implements Key
    * @throws IOException
    */
   public boolean seek(KeyValue seekKey) throws IOException {
-    if(this.current == null) {
+    if (this.current == null) {
       return false;
     }
     this.heap.add(this.current);

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=1176177&r1=1176176&r2=1176177&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Tue Sep 27 02:41:56 2011
@@ -86,20 +86,21 @@ class LogRoller extends Thread implement
       } catch (FailedLogCloseException e) {
         LOG.fatal("Forcing server shutdown", e);
         server.checkFileSystem();
-        server.abort();
+        server.abort("Failed log close in log roller", e);
       } catch (java.net.ConnectException e) {
         LOG.fatal("Forcing server shutdown", e);
         server.checkFileSystem();
-        server.abort();
+        server.abort("Failed connect in log roller", e);
       } catch (IOException ex) {
         LOG.fatal("Log rolling failed with ioe: ",
           RemoteExceptionHandler.checkIOException(ex));
         server.checkFileSystem();
         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
-        server.abort();
+        server.abort("IOE in log roller", ex);
       } catch (Exception ex) {
         LOG.error("Log rolling failed", ex);
         server.checkFileSystem();
+        server.abort("Log rolling failed", ex);
       } finally {
         rollLog.set(false);
         rollLock.unlock();

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1176177&r1=1176176&r2=1176177&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Tue Sep 27 02:41:56 2011
@@ -39,6 +39,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -81,6 +82,9 @@ public class MemStore implements HeapSiz
   // Used to track own heapSize
   final AtomicLong size;
 
+  TimeRangeTracker timeRangeTracker;
+  TimeRangeTracker snapshotTimeRangeTracker;
+
   /**
    * Default constructor. Used for tests.
    */
@@ -99,6 +103,8 @@ public class MemStore implements HeapSiz
     this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
     this.kvset = new KeyValueSkipListSet(c);
     this.snapshot = new KeyValueSkipListSet(c);
+    timeRangeTracker = new TimeRangeTracker();
+    snapshotTimeRangeTracker = new TimeRangeTracker();
     this.size = new AtomicLong(DEEP_OVERHEAD);
   }
 
@@ -128,6 +134,8 @@ public class MemStore implements HeapSiz
         if (!this.kvset.isEmpty()) {
           this.snapshot = this.kvset;
           this.kvset = new KeyValueSkipListSet(this.comparator);
+          this.snapshotTimeRangeTracker = this.timeRangeTracker;
+          this.timeRangeTracker = new TimeRangeTracker();
           // Reset heap to not include any keys
           this.size.set(DEEP_OVERHEAD);
         }
@@ -167,6 +175,7 @@ public class MemStore implements HeapSiz
       // create a new snapshot and let the old one go.
       if (!ss.isEmpty()) {
         this.snapshot = new KeyValueSkipListSet(this.comparator);
+        this.snapshotTimeRangeTracker = new TimeRangeTracker();
       }
     } finally {
       this.lock.writeLock().unlock();
@@ -183,6 +192,7 @@ public class MemStore implements HeapSiz
     this.lock.readLock().lock();
     try {
       s = heapSizeChange(kv, this.kvset.add(kv));
+      timeRangeTracker.includeTimestamp(kv);
       this.size.addAndGet(s);
     } finally {
       this.lock.readLock().unlock();
@@ -198,9 +208,9 @@ public class MemStore implements HeapSiz
   long delete(final KeyValue delete) {
     long s = 0;
     this.lock.readLock().lock();
-
     try {
       s += heapSizeChange(delete, this.kvset.add(delete));
+      timeRangeTracker.includeTimestamp(delete);
     } finally {
       this.lock.readLock().unlock();
     }
@@ -487,6 +497,19 @@ public class MemStore implements HeapSiz
     return false;
   }
 
+  /**
+   * Check if this memstore may contain the required keys
+   * @param scan
+   * @return False if the key definitely does not exist in this Memstore
+   */
+  public boolean shouldSeek(Scan scan) {
+    return timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
+        snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange());
+  }
+
+  public TimeRangeTracker getSnapshotTimeRangeTracker() {
+    return this.snapshotTimeRangeTracker;
+  }
 
   /*
    * MemStoreScanner implements the KeyValueScanner.
@@ -520,7 +543,7 @@ public class MemStore implements HeapSiz
       StoreScanner level with coordination with MemStoreScanner.
 
     */
-    
+
     MemStoreScanner() {
       super();
 
@@ -531,7 +554,7 @@ public class MemStore implements HeapSiz
       KeyValue ret = null;
       long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
       //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
-      
+
       while (ret == null && it.hasNext()) {
         KeyValue v = it.next();
         if (v.getMemstoreTS() <= readPoint) {
@@ -566,7 +589,7 @@ public class MemStore implements HeapSiz
       //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
       //    snapshot.size() + " threadread = " + readPoint);
 
-      
+
       KeyValue lowest = getLowest();
 
       // has data := (lowest != null)
@@ -631,7 +654,7 @@ public class MemStore implements HeapSiz
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
-  
+
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +



Mime
View raw message