hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1207588 - in /hbase/branches/0.92: CHANGES.txt src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Date Mon, 28 Nov 2011 20:50:27 GMT
Author: tedyu
Date: Mon Nov 28 20:50:26 2011
New Revision: 1207588

URL: http://svn.apache.org/viewvc?rev=1207588&view=rev
Log:
HBASE-4869  Backport to 0.92: HBASE-4797 [availability] Skip recovered.edits
               files with edits older than what region currently has (Jimmy Xiang)

Modified:
    hbase/branches/0.92/CHANGES.txt
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1207588&r1=1207587&r2=1207588&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Mon Nov 28 20:50:26 2011
@@ -744,6 +744,8 @@ Release 0.92.0 - Unreleased
    HBASE-4805  Allow better control of resource consumption in HTable (Lars H)
    HBASE-4851  hadoop maven dependency needs to be an optional one
                (Roman Shaposhnik)
+   HBASE-4869  Backport to 0.92: HBASE-4797 [availability] Skip recovered.edits
+               files with edits older than what region currently has (Jimmy Xiang)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1207588&r1=1207587&r2=1207588&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon
Nov 28 20:50:26 2011
@@ -165,7 +165,7 @@ public class HRegion implements HeapSize
   // Members
   //////////////////////////////////////////////////////////////////////////////
 
-  private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows = 
+  private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
     new ConcurrentHashMap<HashedBytes, CountDownLatch>();
   private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
     new ConcurrentHashMap<Integer, HashedBytes>();
@@ -402,10 +402,10 @@ public class HRegion implements HeapSize
    */
   public long initialize(final CancelableProgressable reporter)
   throws IOException {
-  
+
     MonitoredTask status = TaskMonitor.get().createStatus(
         "Initializing region " + this);
-    
+
     if (coprocessorHost != null) {
       status.setStatus("Running coprocessor pre-open hook");
       coprocessorHost.preOpen();
@@ -473,17 +473,17 @@ public class HRegion implements HeapSize
     this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
 
     this.writestate.compacting = 0;
-    
+
     // Initialize split policy
     this.splitPolicy = RegionSplitPolicy.create(this, conf);
-    
+
     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
     // Use maximum of log sequenceid or that which was found in stores
     // (particularly if no recovered edits, seqid will be -1).
     long nextSeqid = maxSeqId + 1;
     LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
 
-    
+
     if (coprocessorHost != null) {
       status.setStatus("Running coprocessor post-open hooks");
       coprocessorHost.postOpen();
@@ -520,7 +520,7 @@ public class HRegion implements HeapSize
     }
     return false;
   }
-  
+
   /**
    * This function will return the HDFS blocks distribution based on the data
    * captured when HFile is created
@@ -557,7 +557,7 @@ public class HRegion implements HeapSize
     Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf),
       tableDescriptor.getName());
     FileSystem fs = tablePath.getFileSystem(conf);
-         
+
     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
       Path storeHomeDir = Store.getStoreHomedir(tablePath, regionEncodedName,
       family.getName());
@@ -575,27 +575,27 @@ public class HRegion implements HeapSize
     }
     return hdfsBlocksDistribution;
   }
-  
+
   public AtomicLong getMemstoreSize() {
     return memstoreSize;
   }
-  
+
   /**
-   * Increase the size of mem store in this region and the size of global mem 
+   * Increase the size of mem store in this region and the size of global mem
    * store
    * @param memStoreSize
    * @return the size of memstore in this region
    */
   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
     if (this.rsServices != null) {
-      RegionServerAccounting rsAccounting = 
+      RegionServerAccounting rsAccounting =
         this.rsServices.getRegionServerAccounting();
-      
+
       if (rsAccounting != null) {
         rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
       }
     }
-    return this.memstoreSize.getAndAdd(memStoreSize);  
+    return this.memstoreSize.getAndAdd(memStoreSize);
   }
 
   /*
@@ -710,7 +710,7 @@ public class HRegion implements HeapSize
     MonitoredTask status = TaskMonitor.get().createStatus(
         "Closing region " + this +
         (abort ? " due to abort" : ""));
-    
+
     status.setStatus("Waiting for close lock");
     try {
       synchronized (closeLock) {
@@ -1095,7 +1095,7 @@ public class HRegion implements HeapSize
           }
         }
         boolean result = internalFlushcache(status);
-        
+
         if (coprocessorHost != null) {
           status.setStatus("Running post-flush coprocessor hooks");
           coprocessorHost.postFlush();
@@ -1143,7 +1143,7 @@ public class HRegion implements HeapSize
    * routes.
    *
    * <p> This method may block for some time.
-   * @param status 
+   * @param status
    *
    * @return true if the region needs compacting
    *
@@ -1159,7 +1159,7 @@ public class HRegion implements HeapSize
    * @param wal Null if we're NOT to go via hlog/wal.
    * @param myseqid The seqid to use if <code>wal</code> is null writing out
    * flush file.
-   * @param status 
+   * @param status
    * @return true if the region needs compacting
    * @throws IOException
    * @see #internalFlushcache(MonitoredTask)
@@ -1694,7 +1694,7 @@ public class HRegion implements HeapSize
 
   /**
    * Perform a batch of puts.
-   * 
+   *
    * @param putsAndLocks
    *          the list of puts paired with their requested lock IDs.
    * @return an array of OperationStatus which internally contains the
@@ -2258,16 +2258,36 @@ public class HRegion implements HeapSize
   protected long replayRecoveredEditsIfAny(final Path regiondir,
       final long minSeqId, final CancelableProgressable reporter,
       final MonitoredTask status)
-  throws UnsupportedEncodingException, IOException {
+      throws UnsupportedEncodingException, IOException {
     long seqid = minSeqId;
     NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
     if (files == null || files.isEmpty()) return seqid;
+    boolean checkSafeToSkip = true;
     for (Path edits: files) {
       if (edits == null || !this.fs.exists(edits)) {
         LOG.warn("Null or non-existent edits file: " + edits);
         continue;
       }
       if (isZeroLengthThenDelete(this.fs, edits)) continue;
+
+      if (checkSafeToSkip) {
+        Path higher = files.higher(edits);
+        long maxSeqId = Long.MAX_VALUE;
+        if (higher != null) {
+          // Edit file name pattern, HLog.EDITFILES_NAME_PATTERN: "-?[0-9]+"
+          String fileName = higher.getName();
+          maxSeqId = Math.abs(Long.parseLong(fileName));
+        }
+        if (maxSeqId <= minSeqId) {
+          String msg = "Maximum possible sequenceid for this log is " + maxSeqId
+              + ", skipped the whole file, path=" + edits;
+          LOG.debug(msg);
+          continue;
+        } else {
+          checkSafeToSkip = false;
+        }
+      }
+
       try {
         seqid = replayRecoveredEdits(edits, seqid, reporter);
       } catch (IOException e) {
@@ -2312,139 +2332,139 @@ public class HRegion implements HeapSize
       minSeqId + "; path=" + edits;
     LOG.info(msg);
     MonitoredTask status = TaskMonitor.get().createStatus(msg);
-    
+
     status.setStatus("Opening logs");
     HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
     try {
-    long currentEditSeqId = minSeqId;
-    long firstSeqIdInLog = -1;
-    long skippedEdits = 0;
-    long editsCount = 0;
-    long intervalEdits = 0;
-    HLog.Entry entry;
-    Store store = null;
-    boolean reported_once = false;
+      long currentEditSeqId = minSeqId;
+      long firstSeqIdInLog = -1;
+      long skippedEdits = 0;
+      long editsCount = 0;
+      long intervalEdits = 0;
+      HLog.Entry entry;
+      Store store = null;
+      boolean reported_once = false;
 
-    try {
-      // How many edits seen before we check elapsed time
-      int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
-          2000);
-      // How often to send a progress report (default 1/2 master timeout)
-      int period = this.conf.getInt("hbase.hstore.report.period",
-          this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout",
-              180000) / 2);
-      long lastReport = EnvironmentEdgeManager.currentTimeMillis();
-
-      while ((entry = reader.next()) != null) {
-        HLogKey key = entry.getKey();
-        WALEdit val = entry.getEdit();
-
-        if (reporter != null) {
-          intervalEdits += val.size();
-          if (intervalEdits >= interval) {
-            // Number of edits interval reached
-            intervalEdits = 0;
-            long cur = EnvironmentEdgeManager.currentTimeMillis();
-            if (lastReport + period <= cur) {
-              status.setStatus("Replaying edits..." +
-                  " skipped=" + skippedEdits +
-                  " edits=" + editsCount);
-              // Timeout reached
-              if(!reporter.progress()) {
-                msg = "Progressable reporter failed, stopping replay";
-                LOG.warn(msg);
-                status.abort(msg);
-                throw new IOException(msg);
+      try {
+        // How many edits seen before we check elapsed time
+        int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
+            2000);
+        // How often to send a progress report (default 1/2 master timeout)
+        int period = this.conf.getInt("hbase.hstore.report.period",
+            this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout",
+                180000) / 2);
+        long lastReport = EnvironmentEdgeManager.currentTimeMillis();
+
+        while ((entry = reader.next()) != null) {
+          HLogKey key = entry.getKey();
+          WALEdit val = entry.getEdit();
+
+          if (reporter != null) {
+            intervalEdits += val.size();
+            if (intervalEdits >= interval) {
+              // Number of edits interval reached
+              intervalEdits = 0;
+              long cur = EnvironmentEdgeManager.currentTimeMillis();
+              if (lastReport + period <= cur) {
+                status.setStatus("Replaying edits..." +
+                    " skipped=" + skippedEdits +
+                    " edits=" + editsCount);
+                // Timeout reached
+                if(!reporter.progress()) {
+                  msg = "Progressable reporter failed, stopping replay";
+                  LOG.warn(msg);
+                  status.abort(msg);
+                  throw new IOException(msg);
+                }
+                reported_once = true;
+                lastReport = cur;
               }
-              reported_once = true;
-              lastReport = cur;
             }
           }
-        }
 
-        // Start coprocessor replay here. The coprocessor is for each WALEdit
-        // instead of a KeyValue.
-        if (coprocessorHost != null) {
-          status.setStatus("Running pre-WAL-restore hook in coprocessors");
-          if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
-            // if bypass this log entry, ignore it ...
-            continue;
+          // Start coprocessor replay here. The coprocessor is for each WALEdit
+          // instead of a KeyValue.
+          if (coprocessorHost != null) {
+            status.setStatus("Running pre-WAL-restore hook in coprocessors");
+            if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
+              // if bypass this log entry, ignore it ...
+              continue;
+            }
           }
-        }
 
-        if (firstSeqIdInLog == -1) {
-          firstSeqIdInLog = key.getLogSeqNum();
-        }
-        // Now, figure if we should skip this edit.
-        if (key.getLogSeqNum() <= currentEditSeqId) {
-          skippedEdits++;
-          continue;
-        }
-        currentEditSeqId = key.getLogSeqNum();
-        boolean flush = false;
-        for (KeyValue kv: val.getKeyValues()) {
-          // Check this edit is for me. Also, guard against writing the special
-          // METACOLUMN info such as HBASE::CACHEFLUSH entries
-          if (kv.matchingFamily(HLog.METAFAMILY) ||
-              !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes()))
{
-            skippedEdits++;
-            continue;
-              }
-          // Figure which store the edit is meant for.
-          if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
-            store = this.stores.get(kv.getFamily());
+          if (firstSeqIdInLog == -1) {
+            firstSeqIdInLog = key.getLogSeqNum();
           }
-          if (store == null) {
-            // This should never happen.  Perhaps schema was changed between
-            // crash and redeploy?
-            LOG.warn("No family for " + kv);
+          // Now, figure if we should skip this edit.
+          if (key.getLogSeqNum() <= currentEditSeqId) {
             skippedEdits++;
             continue;
           }
-          // Once we are over the limit, restoreEdit will keep returning true to
-          // flush -- but don't flush until we've played all the kvs that make up
-          // the WALEdit.
-          flush = restoreEdit(store, kv);
-          editsCount++;
-        }
-        if (flush) internalFlushcache(null, currentEditSeqId, status);
+          currentEditSeqId = key.getLogSeqNum();
+          boolean flush = false;
+          for (KeyValue kv: val.getKeyValues()) {
+            // Check this edit is for me. Also, guard against writing the special
+            // METACOLUMN info such as HBASE::CACHEFLUSH entries
+            if (kv.matchingFamily(HLog.METAFAMILY) ||
+                !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes()))
{
+              skippedEdits++;
+              continue;
+                }
+            // Figure which store the edit is meant for.
+            if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
+              store = this.stores.get(kv.getFamily());
+            }
+            if (store == null) {
+              // This should never happen.  Perhaps schema was changed between
+              // crash and redeploy?
+              LOG.warn("No family for " + kv);
+              skippedEdits++;
+              continue;
+            }
+            // Once we are over the limit, restoreEdit will keep returning true to
+            // flush -- but don't flush until we've played all the kvs that make up
+            // the WALEdit.
+            flush = restoreEdit(store, kv);
+            editsCount++;
+          }
+          if (flush) internalFlushcache(null, currentEditSeqId, status);
 
-        if (coprocessorHost != null) {
-          coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
+          if (coprocessorHost != null) {
+            coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
+          }
         }
-      }
-    } catch (EOFException eof) {
-      Path p = HLog.moveAsideBadEditsFile(fs, edits);
-      msg = "Encountered EOF. Most likely due to Master failure during " +
-          "log spliting, so we have this data in another edit.  " +
-          "Continuing, but renaming " + edits + " as " + p;
-      LOG.warn(msg, eof);
-      status.abort(msg);
-    } catch (IOException ioe) {
-      // If the IOE resulted from bad file format,
-      // then this problem is idempotent and retrying won't help
-      if (ioe.getCause() instanceof ParseException) {
+      } catch (EOFException eof) {
         Path p = HLog.moveAsideBadEditsFile(fs, edits);
-        msg = "File corruption encountered!  " +
+        msg = "Encountered EOF. Most likely due to Master failure during " +
+            "log spliting, so we have this data in another edit.  " +
             "Continuing, but renaming " + edits + " as " + p;
-        LOG.warn(msg, ioe);
-        status.setStatus(msg);
-      } else {
-        status.abort(StringUtils.stringifyException(ioe));
-        // other IO errors may be transient (bad network connection,
-        // checksum exception on one datanode, etc).  throw & retry
-        throw ioe;
+        LOG.warn(msg, eof);
+        status.abort(msg);
+      } catch (IOException ioe) {
+        // If the IOE resulted from bad file format,
+        // then this problem is idempotent and retrying won't help
+        if (ioe.getCause() instanceof ParseException) {
+          Path p = HLog.moveAsideBadEditsFile(fs, edits);
+          msg = "File corruption encountered!  " +
+              "Continuing, but renaming " + edits + " as " + p;
+          LOG.warn(msg, ioe);
+          status.setStatus(msg);
+        } else {
+          status.abort(StringUtils.stringifyException(ioe));
+          // other IO errors may be transient (bad network connection,
+          // checksum exception on one datanode, etc).  throw & retry
+          throw ioe;
+        }
       }
-    }
-    if (reporter != null && !reported_once) {
-      reporter.progress();
-    }
-    msg = "Applied " + editsCount + ", skipped " + skippedEdits +
-    ", firstSequenceidInLog=" + firstSeqIdInLog +
-    ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
-    status.markComplete(msg);
-    LOG.debug(msg);
-    return currentEditSeqId;
+      if (reporter != null && !reported_once) {
+        reporter.progress();
+      }
+      msg = "Applied " + editsCount + ", skipped " + skippedEdits +
+        ", firstSequenceidInLog=" + firstSeqIdInLog +
+        ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
+      status.markComplete(msg);
+      LOG.debug(msg);
+      return currentEditSeqId;
     } finally {
       reader.close();
       status.cleanup();
@@ -2468,7 +2488,7 @@ public class HRegion implements HeapSize
    * @throws IOException
    */
   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
-  throws IOException {
+      throws IOException {
     FileStatus stat = fs.getFileStatus(p);
     if (stat.getLen() > 0) return false;
     LOG.warn("File " + p + " is zero-length, deleting.");
@@ -2477,7 +2497,7 @@ public class HRegion implements HeapSize
   }
 
   protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
-  throws IOException {
+      throws IOException {
     return new Store(tableDir, this, c, this.fs, this.conf);
   }
 
@@ -2557,7 +2577,7 @@ public class HRegion implements HeapSize
     try {
       HashedBytes rowKey = new HashedBytes(row);
       CountDownLatch rowLatch = new CountDownLatch(1);
-      
+
       // loop until we acquire the row lock (unless !waitForLock)
       while (true) {
         CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
@@ -2578,7 +2598,7 @@ public class HRegion implements HeapSize
           }
         }
       }
-       
+
       // loop until we generate an unused lock id
       while (true) {
         Integer lockId = lockIdGenerator.incrementAndGet();
@@ -2604,7 +2624,7 @@ public class HRegion implements HeapSize
     HashedBytes rowKey = lockIds.get(lockid);
     return rowKey == null ? null : rowKey.getBytes();
   }
-  
+
   /**
    * Release the row lock!
    * @param lockId  The lock ID to release.
@@ -2655,7 +2675,7 @@ public class HRegion implements HeapSize
     }
     return lid;
   }
-    
+
   /**
    * Determines whether multiple column families are present
    * Precondition: familyPaths is not null
@@ -2750,7 +2770,7 @@ public class HRegion implements HeapSize
         try {
           store.bulkLoadHFile(path);
         } catch (IOException ioe) {
-          // a failure here causes an atomicity violation that we currently 
+          // a failure here causes an atomicity violation that we currently
           // cannot recover from since it is likely a failed hdfs operation.
 
           // TODO Need a better story for reverting partial failures due to HDFS.
@@ -3059,9 +3079,9 @@ public class HRegion implements HeapSize
 
   /**
    * Convenience method creating new HRegions. Used by createTable.
-   * The {@link HLog} for the created region needs to be closed explicitly. 
+   * The {@link HLog} for the created region needs to be closed explicitly.
    * Use {@link HRegion#getLog()} to get access.
-   * 
+   *
    * @param info Info for region to create.
    * @param rootDir Root directory for HBase instance
    * @param conf
@@ -3088,14 +3108,14 @@ public class HRegion implements HeapSize
     HLog effectiveHLog = hlog;
     if (hlog == null) {
       effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
-          new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);     
+          new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
     }
     HRegion region = HRegion.newHRegion(tableDir,
         effectiveHLog, fs, conf, info, hTableDescriptor, null);
     region.initialize();
     return region;
   }
-  
+
   /**
    * Open a Region.
    * @param info Info for region to be opened.
@@ -3895,7 +3915,7 @@ public class HRegion implements HeapSize
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
       ClassSize.OBJECT + // closeLock
       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
-      ClassSize.ATOMIC_LONG + // memStoreSize 
+      ClassSize.ATOMIC_LONG + // memStoreSize
       ClassSize.ATOMIC_INTEGER + // lockIdGenerator
       (3 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, lockIds, scannerReadPoints
       WriteState.HEAP_SIZE + // writestate
@@ -4120,13 +4140,13 @@ public class HRegion implements HeapSize
     if (this.explicitSplitPoint != null) {
       return this.explicitSplitPoint;
     }
-    
+
     if (!splitPolicy.shouldSplit()) {
       return null;
     }
-    
+
     byte[] ret = splitPolicy.getSplitPoint();
-    
+
     if (ret != null) {
       try {
         checkRow(ret, "calculated split");
@@ -4134,7 +4154,7 @@ public class HRegion implements HeapSize
         LOG.error("Ignoring invalid split", e);
         return null;
       }
-    }        
+    }
     return ret;
   }
 

Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1207588&r1=1207587&r2=1207588&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
(original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Mon Nov 28 20:50:26 2011
@@ -34,6 +34,8 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -51,10 +53,10 @@ import org.apache.hadoop.hbase.Multithre
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -63,8 +65,12 @@ import org.apache.hadoop.hbase.filter.Fi
 import org.apache.hadoop.hbase.filter.NullComparator;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@@ -1298,7 +1304,7 @@ public class TestHRegion extends HBaseTe
       LOG.info("" + addContent(region, fam3));
       region.flushcache();
       region.compactStores();
-      byte [] splitRow = region.checkSplit();      
+      byte [] splitRow = region.checkSplit();
       assertNotNull(splitRow);
       LOG.info("SplitRow: " + Bytes.toString(splitRow));
       HRegion [] subregions = splitRegion(region, splitRow);
@@ -2172,7 +2178,7 @@ public class TestHRegion extends HBaseTe
     } catch (Exception exception) {
         // Expected.
     }
-    
+
 
     assertICV(row1, fam1, qual1, row1Field1);
     assertICV(row1, fam1, qual2, row1Field2);
@@ -2304,7 +2310,7 @@ public class TestHRegion extends HBaseTe
       LOG.info("" + addContent(region, fam3));
       region.flushcache();
       region.compactStores();
-      byte [] splitRow = region.checkSplit();      
+      byte [] splitRow = region.checkSplit();
       assertNotNull(splitRow);
       LOG.info("SplitRow: " + Bytes.toString(splitRow));
       HRegion [] regions = splitRegion(region, splitRow);
@@ -2339,7 +2345,7 @@ public class TestHRegion extends HBaseTe
         // To make regions splitable force compaction.
         for (int i = 0; i < regions.length; i++) {
           regions[i].compactStores();
-          midkeys[i] = regions[i].checkSplit();          
+          midkeys[i] = regions[i].checkSplit();
         }
 
         TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
@@ -2811,6 +2817,115 @@ public class TestHRegion extends HBaseTe
     region.get(g, null);
   }
 
+  public void testSkipRecoveredEditsReplay() throws Exception {
+    String method = "testSkipRecoveredEditsReplay";
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family = Bytes.toBytes("family");
+    Configuration conf = HBaseConfiguration.create();
+    initHRegion(tableName, method, conf, family);
+    Path regiondir = region.getRegionDir();
+    FileSystem fs = region.getFilesystem();
+    byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
+
+    Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir);
+
+    long maxSeqId = 1050;
+    long minSeqId = 1000;
+
+    for (long i = minSeqId; i <= maxSeqId; i += 10) {
+      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
+      HLog.Writer writer = HLog.createWriter(fs, recoveredEdits, conf);
+
+      long time = System.nanoTime();
+      WALEdit edit = new WALEdit();
+      edit.add(new KeyValue(row, family, Bytes.toBytes(i),
+          time, KeyValue.Type.Put, Bytes.toBytes(i)));
+      writer.append(new HLog.Entry(new HLogKey(regionName, tableName,
+          i, time, HConstants.DEFAULT_CLUSTER_ID), edit));
+
+      writer.close();
+    }
+    MonitoredTask status = TaskMonitor.get().createStatus(method);
+    long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId-1, null, status);
+    assertEquals(maxSeqId, seqId);
+    Get get = new Get(row);
+    Result result = region.get(get, null);
+    for (long i = minSeqId; i <= maxSeqId; i += 10) {
+      List<KeyValue> kvs = result.getColumn(family, Bytes.toBytes(i));
+      assertEquals(1, kvs.size());
+      assertEquals(Bytes.toBytes(i), kvs.get(0).getValue());
+    }
+  }
+
+  public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
+    String method = "testSkipRecoveredEditsReplaySomeIgnored";
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family = Bytes.toBytes("family");
+    initHRegion(tableName, method, HBaseConfiguration.create(), family);
+    Path regiondir = region.getRegionDir();
+    FileSystem fs = region.getFilesystem();
+    byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
+
+    Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir);
+
+    long maxSeqId = 1050;
+    long minSeqId = 1000;
+
+    for (long i = minSeqId; i <= maxSeqId; i += 10) {
+      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
+      HLog.Writer writer = HLog.createWriter(fs, recoveredEdits, conf);
+
+      long time = System.nanoTime();
+      WALEdit edit = new WALEdit();
+      edit.add(new KeyValue(row, family, Bytes.toBytes(i),
+          time, KeyValue.Type.Put, Bytes.toBytes(i)));
+      writer.append(new HLog.Entry(new HLogKey(regionName, tableName,
+          i, time, HConstants.DEFAULT_CLUSTER_ID), edit));
+
+      writer.close();
+    }
+    long recoverSeqId = 1030;
+    MonitoredTask status = TaskMonitor.get().createStatus(method);
+    long seqId = region.replayRecoveredEditsIfAny(regiondir, recoverSeqId-1, null, status);
+    assertEquals(maxSeqId, seqId);
+    Get get = new Get(row);
+    Result result = region.get(get, null);
+    for (long i = minSeqId; i <= maxSeqId; i += 10) {
+      List<KeyValue> kvs = result.getColumn(family, Bytes.toBytes(i));
+      if (i < recoverSeqId) {
+        assertEquals(0, kvs.size());
+      } else {
+        assertEquals(1, kvs.size());
+        assertEquals(Bytes.toBytes(i), kvs.get(0).getValue());
+      }
+    }
+  }
+
+  public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
+    String method = "testSkipRecoveredEditsReplayAllIgnored";
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family = Bytes.toBytes("family");
+    initHRegion(tableName, method, HBaseConfiguration.create(), family);
+    Path regiondir = region.getRegionDir();
+    FileSystem fs = region.getFilesystem();
+
+    Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir);
+    for (int i = 1000; i < 1050; i += 10) {
+      Path recoveredEdits = new Path(
+          recoveredEditsDir, String.format("%019d", i));
+      FSDataOutputStream dos=  fs.create(recoveredEdits);
+      dos.writeInt(i);
+      dos.close();
+    }
+    long minSeqId = 2000;
+    Path recoveredEdits = new Path(
+        recoveredEditsDir, String.format("%019d", minSeqId-1));
+    FSDataOutputStream dos=  fs.create(recoveredEdits);
+    dos.close();
+    long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId, null, null);
+    assertEquals(minSeqId, seqId);
+  }
+
   public void testIndexesScanWithOneDeletedRow() throws IOException {
     byte[] tableName = Bytes.toBytes("testIndexesScanWithOneDeletedRow");
     byte[] family = Bytes.toBytes("family");
@@ -2866,13 +2981,13 @@ public class TestHRegion extends HBaseTe
     HColumnDescriptor hcd = new HColumnDescriptor(fam1, Integer.MAX_VALUE,
         HColumnDescriptor.DEFAULT_COMPRESSION, false, true,
         HColumnDescriptor.DEFAULT_TTL, "rowcol");
-    
+
     HTableDescriptor htd = new HTableDescriptor(tableName);
     htd.addFamily(hcd);
     HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
     Path path = new Path(DIR + "testBloomFilterSize");
     region = HRegion.createHRegion(info, path, conf, htd);
-    
+
     int num_unique_rows = 10;
     int duplicate_multiplier =2;
     int num_storefiles = 4;
@@ -2889,7 +3004,7 @@ public class TestHRegion extends HBaseTe
       }
       region.flushcache();
     }
-    //before compaction 
+    //before compaction
     Store store = region.getStore(fam1);
     List<StoreFile> storeFiles = store.getStorefiles();
     for (StoreFile storefile : storeFiles) {
@@ -2899,10 +3014,10 @@ public class TestHRegion extends HBaseTe
       assertEquals(num_unique_rows*duplicate_multiplier, reader.getEntries());
       assertEquals(num_unique_rows, reader.getFilterEntries());
     }
-    
-    region.compactStores(true); 
-    
-    //after compaction 
+
+    region.compactStores(true);
+
+    //after compaction
     storeFiles = store.getStorefiles();
     for (StoreFile storefile : storeFiles) {
       StoreFile.Reader reader = storefile.getReader();
@@ -2911,9 +3026,9 @@ public class TestHRegion extends HBaseTe
       assertEquals(num_unique_rows*duplicate_multiplier*num_storefiles,
           reader.getEntries());
       assertEquals(num_unique_rows, reader.getFilterEntries());
-    }  
+    }
   }
-  
+
   public void testAllColumnsWithBloomFilter() throws IOException {
     byte [] TABLE = Bytes.toBytes("testAllColumnsWithBloomFilter");
     byte [] FAMILY = Bytes.toBytes("family");
@@ -3004,13 +3119,13 @@ public class TestHRegion extends HBaseTe
     final int DEFAULT_BLOCK_SIZE = 1024;
     htu.getConfiguration().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     htu.getConfiguration().setInt("dfs.replication", 2);
-    
-    
+
+
     // set up a cluster with 3 nodes
     MiniHBaseCluster cluster;
     String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
     int regionServersCount = 3;
-	    
+
     try {
       cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
       byte [][] families = {fam1, fam2};
@@ -3020,31 +3135,31 @@ public class TestHRegion extends HBaseTe
       byte row[] = Bytes.toBytes("row1");
       byte col[] = Bytes.toBytes("col1");
 
-      Put put = new Put(row);	        
+      Put put = new Put(row);
       put.add(fam1, col, 1, Bytes.toBytes("test1"));
       put.add(fam2, col, 1, Bytes.toBytes("test2"));
       ht.put(put);
-      
+
       HRegion firstRegion = htu.getHBaseCluster().
         getRegions(Bytes.toBytes(this.getName())).get(0);
       firstRegion.flushcache();
       HDFSBlocksDistribution blocksDistribution1 =
         firstRegion.getHDFSBlocksDistribution();
-      
+
       // given the default replication factor is 2 and we have 2 HFiles,
       // we will have total of 4 replica of blocks on 3 datanodes; thus there
       // must be at least one host that have replica for 2 HFiles. That host's
       // weight will be equal to the unique block weight.
       long uniqueBlocksWeight1 =
         blocksDistribution1.getUniqueBlocksTotalWeight();
-      
+
       String topHost = blocksDistribution1.getTopHosts().get(0);
       long topHostWeight = blocksDistribution1.getWeight(topHost);
       assertTrue(uniqueBlocksWeight1 == topHostWeight);
-      
+
       // use the static method to compute the value, it should be the same.
       // static method is used by load balancer or other components
-      HDFSBlocksDistribution blocksDistribution2 = 
+      HDFSBlocksDistribution blocksDistribution2 =
         HRegion.computeHDFSBlocksDistribution(htu.getConfiguration(),
         firstRegion.getTableDesc(),
         firstRegion.getRegionInfo().getEncodedName());
@@ -3056,7 +3171,7 @@ public class TestHRegion extends HBaseTe
         htu.shutdownMiniCluster();
       }
   }
-  
+
   private void putData(int startRow, int numRows, byte [] qf,
       byte [] ...families)
   throws IOException {



Mime
View raw message