hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject svn commit: r1446147 [16/35] - in /hbase/branches/hbase-7290v2: ./ bin/ conf/ dev-support/ hbase-client/ hbase-common/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ hbase-common/src/...
Date Thu, 14 Feb 2013 12:58:21 GMT
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Feb 14 12:58:12 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -43,6 +44,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -66,6 +68,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -128,8 +131,8 @@ public class HStore implements Store, St
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final boolean verifyBulkLoads;
 
-  // not private for testing
-  /* package */ScanInfo scanInfo;
+  private ScanInfo scanInfo;
+
   /*
    * List of store files inside this store. This is an immutable list that
    * is atomically replaced when its contents change.
@@ -152,7 +155,11 @@ public class HStore implements Store, St
   // Comparing KeyValues
   final KeyValue.KVComparator comparator;
 
-  private final Compactor compactor;
+  private Compactor compactor;
+
+  private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
+  private static int flush_retries_number;
+  private static int pauseTime;
 
   /**
    * Constructor
@@ -178,9 +185,13 @@ public class HStore implements Store, St
     this.region = region;
     this.family = family;
     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
+    // CompoundConfiguration will look for keys in reverse order of addition, so we'd
+    // add global config first, then table and cf overrides, then cf metadata.
     this.conf = new CompoundConfiguration()
       .add(confParam)
-      .add(family.getValues());
+      .addStringMap(region.getTableDesc().getConfiguration())
+      .addStringMap(family.getConfiguration())
+      .addWritableMap(family.getValues());
     this.blocksize = family.getBlocksize();
 
     this.dataBlockEncoder =
@@ -217,10 +228,21 @@ public class HStore implements Store, St
     this.checksumType = getChecksumType(conf);
     // initilize bytes per checksum
     this.bytesPerChecksum = getBytesPerChecksum(conf);
-    // Create a compaction tool instance
-    this.compactor = new Compactor(conf);
     // Create a compaction manager.
-    this.compactionPolicy = new CompactionPolicy(conf, this);
+    if (HStore.flush_retries_number == 0) {
+      HStore.flush_retries_number = conf.getInt(
+          "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
+      HStore.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE,
+          HConstants.DEFAULT_HBASE_SERVER_PAUSE);
+      if (HStore.flush_retries_number <= 0) {
+        throw new IllegalArgumentException(
+            "hbase.hstore.flush.retries.number must be > 0, not "
+                + HStore.flush_retries_number);
+      }
+    }
+    this.compactionPolicy = CompactionPolicy.create(this, conf);
+    // Get the compaction tool instance for this policy
+    this.compactor = compactionPolicy.getCompactor();
   }
 
   /**
@@ -267,7 +289,7 @@ public class HStore implements Store, St
     return homedir;
   }
 
-  FileSystem getFileSystem() {
+  public FileSystem getFileSystem() {
     return this.fs;
   }
 
@@ -311,6 +333,13 @@ public class HStore implements Store, St
     }
   }
 
+  /**
+   * @return how many bytes to write between status checks
+   */
+  public static int getCloseCheckInterval() {
+    return closeCheckInterval;
+  }
+
   public HColumnDescriptor getFamily() {
     return this.family;
   }
@@ -429,26 +458,38 @@ public class HStore implements Store, St
       totalValidStoreFile++;
     }
 
+    IOException ioe = null;
     try {
       for (int i = 0; i < totalValidStoreFile; i++) {
-        Future<StoreFile> future = completionService.take();
-        StoreFile storeFile = future.get();
-        long length = storeFile.getReader().length();
-        this.storeSize += length;
-        this.totalUncompressedBytes +=
-          storeFile.getReader().getTotalUncompressedBytes();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("loaded " + storeFile.toStringDetailed());
-        }
-        results.add(storeFile);
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } catch (ExecutionException e) {
-      throw new IOException(e.getCause());
+        try {
+          Future<StoreFile> future = completionService.take();
+          StoreFile storeFile = future.get();
+          long length = storeFile.getReader().length();
+          this.storeSize += length;
+          this.totalUncompressedBytes +=
+              storeFile.getReader().getTotalUncompressedBytes();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("loaded " + storeFile.toStringDetailed());
+          }
+          results.add(storeFile);
+        } catch (InterruptedException e) {
+          if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
+        } catch (ExecutionException e) {
+          if (ioe == null) ioe = new IOException(e.getCause());
+        } 
+      } 
     } finally {
       storeFileOpenerThreadPool.shutdownNow();
     }
+    if (ioe != null) {
+      // close StoreFile readers
+      try {
+        for (StoreFile file : results) {
+          if (file != null) file.closeReader(true);
+        }
+      } catch (IOException e) { }
+      throw ioe;
+    }
 
     return results;
   }
@@ -564,7 +605,11 @@ public class HStore implements Store, St
     // Copy the file if it's on another filesystem
     FileSystem srcFs = srcPath.getFileSystem(conf);
     FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs;
-    if (!srcFs.equals(desFs)) {
+    //We can't compare FileSystem instances as
+    //equals() includes UGI instance as part of the comparison
+    //and won't work when doing SecureBulkLoad
+    //TODO deal with viewFS
+    if (!srcFs.getUri().equals(desFs.getUri())) {
       LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
           "the destination store. Copying file over to destination filesystem.");
       Path tmpPath = getTmpPath();
@@ -645,18 +690,25 @@ public class HStore implements Store, St
           });
         }
 
+        IOException ioe = null;
         try {
           for (int i = 0; i < result.size(); i++) {
-            Future<Void> future = completionService.take();
-            future.get();
+            try {
+              Future<Void> future = completionService.take();
+              future.get();
+            } catch (InterruptedException e) {
+              if (ioe == null) {
+                ioe = new InterruptedIOException();
+                ioe.initCause(e);
+              }
+            } catch (ExecutionException e) {
+              if (ioe == null) ioe = new IOException(e.getCause());
+            }
           }
-        } catch (InterruptedException e) {
-          throw new IOException(e);
-        } catch (ExecutionException e) {
-          throw new IOException(e.getCause());
         } finally {
           storeFileCloserThreadPool.shutdownNow();
         }
+        if (ioe != null) throw ioe;
       }
       LOG.info("Closed " + this);
       return result;
@@ -693,8 +745,43 @@ public class HStore implements Store, St
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
-    return internalFlushCache(
-        snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
+    // Retry after catching exception when flushing, otherwise server will abort
+    // itself
+    IOException lastException = null;
+    for (int i = 0; i < HStore.flush_retries_number; i++) {
+      try {
+        Path pathName = internalFlushCache(snapshot, logCacheFlushId,
+            snapshotTimeRangeTracker, flushedSize, status);
+        try {
+          // Path name is null if there is no entry to flush
+          if (pathName != null) {
+            validateStoreFile(pathName);
+          }
+          return pathName;
+        } catch (Exception e) {
+          LOG.warn("Failed validating store file " + pathName
+              + ", retring num=" + i, e);
+          if (e instanceof IOException) {
+            lastException = (IOException) e;
+          } else {
+            lastException = new IOException(e);
+          }
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed flushing store file, retring num=" + i, e);
+        lastException = e;
+      }
+      if (lastException != null) {
+        try {
+          Thread.sleep(pauseTime);
+        } catch (InterruptedException e) {
+          IOException iie = new InterruptedIOException();
+          iie.initCause(e);
+          throw iie;
+        }
+      }
+    }
+    throw lastException;
   }
 
   /*
@@ -816,7 +903,6 @@ public class HStore implements Store, St
     // Write-out finished successfully, move into the right spot
     String fileName = path.getName();
     Path dstPath = new Path(homedir, fileName);
-    validateStoreFile(path);
     String msg = "Renaming flushed file at " + path + " to " + dstPath;
     LOG.debug(msg);
     status.setStatus("Flushing " + this + ": " + msg);
@@ -855,7 +941,7 @@ public class HStore implements Store, St
    * @param isCompaction whether we are creating a new file in a compaction
    * @return Writer for a new StoreFile in the tmp dir.
    */
-  StoreFile.Writer createWriterInTmp(int maxKeyCount,
+  public StoreFile.Writer createWriterInTmp(int maxKeyCount,
     Compression.Algorithm compression, boolean isCompaction)
   throws IOException {
     final CacheConfig writerCacheConf;
@@ -996,7 +1082,7 @@ public class HStore implements Store, St
    * @throws IOException
    * @return Storefile we compacted into or null if we failed or opted out early.
    */
-  StoreFile compact(CompactionRequest cr) throws IOException {
+  List<StoreFile> compact(CompactionRequest cr) throws IOException {
     if (cr == null || cr.getFiles().isEmpty()) return null;
     Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
     List<StoreFile> filesToCompact = cr.getFiles();
@@ -1006,31 +1092,34 @@ public class HStore implements Store, St
       Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
     }
 
-    // Max-sequenceID is the last key in the files we're compacting
-    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
-
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
         + this + " of " + this.region.getRegionInfo().getRegionNameAsString()
-        + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+        + " into tmpdir=" + region.getTmpDir() + ", totalSize="
         + StringUtils.humanReadableInt(cr.getSize()));
 
-    StoreFile sf = null;
+    List<StoreFile> sfs = new ArrayList<StoreFile>();
     long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
     try {
-      StoreFile.Writer writer =
-        this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
+      List<Path> newFiles =
+        this.compactor.compact(filesToCompact, cr.isMajor());
       // Move the compaction into place.
       if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
-        sf = completeCompaction(filesToCompact, writer);
-        if (region.getCoprocessorHost() != null) {
-          region.getCoprocessorHost().postCompact(this, sf);
+        for (Path newFile: newFiles) {
+          StoreFile sf = completeCompaction(filesToCompact, newFile);
+          if (region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().postCompact(this, sf);
+          }
+          sfs.add(sf);
         }
       } else {
-        // Create storefile around what we wrote with a reader on it.
-        sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
-          this.family.getBloomFilterType(), this.dataBlockEncoder);
-        sf.createReader();
+        for (Path newFile: newFiles) {
+          // Create storefile around what we wrote with a reader on it.
+          StoreFile sf = new StoreFile(this.fs, newFile, this.conf, this.cacheConf,
+            this.family.getBloomFilterType(), this.dataBlockEncoder);
+          sf.createReader();
+          sfs.add(sf);
+        }
       }
     } finally {
       synchronized (filesCompacting) {
@@ -1039,25 +1128,34 @@ public class HStore implements Store, St
     }
 
     long now = EnvironmentEdgeManager.currentTimeMillis();
-    LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
-        + filesToCompact.size() + " file(s) in " + this + " of "
-        + this.region.getRegionInfo().getRegionNameAsString()
-        + " into " +
-        (sf == null ? "none" : sf.getPath().getName()) +
-        ", size=" + (sf == null ? "none" :
-          StringUtils.humanReadableInt(sf.getReader().length()))
-        + "; total size for store is " + StringUtils.humanReadableInt(storeSize)
-        + ". This selection was in queue for "
-        + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())
-        + ", and took " + StringUtils.formatTimeDiff(now, compactionStartTime)
-        + " to execute.");
-    return sf;
+    StringBuilder message = new StringBuilder(
+      "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+      + filesToCompact.size() + " file(s) in " + this + " of "
+      + this.region.getRegionInfo().getRegionNameAsString()
+      + " into ");
+    if (sfs.isEmpty()) {
+      message.append("none, ");
+    } else {
+      for (StoreFile sf: sfs) {
+        message.append(sf.getPath().getName());
+        message.append("(size=");
+        message.append(StringUtils.humanReadableInt(sf.getReader().length()));
+        message.append("), ");
+      }
+    }
+    message.append("total size for store is ")
+      .append(StringUtils.humanReadableInt(storeSize))
+      .append(". This selection was in queue for ")
+      .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
+      .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
+      .append(" to execute.");
+    LOG.info(message.toString());
+    return sfs;
   }
 
   @Override
   public void compactRecentForTesting(int N) throws IOException {
     List<StoreFile> filesToCompact;
-    long maxId;
     boolean isMajor;
 
     this.lock.readLock().lock();
@@ -1078,7 +1176,6 @@ public class HStore implements Store, St
         }
 
         filesToCompact = filesToCompact.subList(count - N, count);
-        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
         isMajor = (filesToCompact.size() == storefiles.size());
         filesCompacting.addAll(filesToCompact);
         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
@@ -1089,12 +1186,14 @@ public class HStore implements Store, St
 
     try {
       // Ready to go. Have list of files to compact.
-      StoreFile.Writer writer =
-        this.compactor.compact(this, filesToCompact, isMajor, maxId);
-      // Move the compaction into place.
-      StoreFile sf = completeCompaction(filesToCompact, writer);
-      if (region.getCoprocessorHost() != null) {
-        region.getCoprocessorHost().postCompact(this, sf);
+      List<Path> newFiles =
+        this.compactor.compact(filesToCompact, isMajor);
+      for (Path newFile: newFiles) {
+        // Move the compaction into place.
+        StoreFile sf = completeCompaction(filesToCompact, newFile);
+        if (region.getCoprocessorHost() != null) {
+          region.getCoprocessorHost().postCompact(this, sf);
+        }
       }
     } finally {
       synchronized (filesCompacting) {
@@ -1258,26 +1357,25 @@ public class HStore implements Store, St
    * </pre>
    *
    * @param compactedFiles list of files that were compacted
-   * @param compactedFile StoreFile that is the result of the compaction
+   * @param newFile StoreFile that is the result of the compaction
    * @return StoreFile created. May be null.
    * @throws IOException
    */
   StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
-                                       final StoreFile.Writer compactedFile)
+                                       final Path newFile)
       throws IOException {
     // 1. Moving the new files into place -- if there is a new file (may not
     // be if all cells were expired or deleted).
     StoreFile result = null;
-    if (compactedFile != null) {
-      validateStoreFile(compactedFile.getPath());
+    if (newFile != null) {
+      validateStoreFile(newFile);
       // Move the file into the right spot
-      Path origPath = compactedFile.getPath();
-      Path destPath = new Path(homedir, origPath.getName());
-      LOG.info("Renaming compacted file at " + origPath + " to " + destPath);
-      if (!fs.rename(origPath, destPath)) {
-        LOG.error("Failed move of compacted file " + origPath + " to " +
+      Path destPath = new Path(homedir, newFile.getName());
+      LOG.info("Renaming compacted file at " + newFile + " to " + destPath);
+      if (!fs.rename(newFile, destPath)) {
+        LOG.error("Failed move of compacted file " + newFile + " to " +
             destPath);
-        throw new IOException("Failed move of compacted file " + origPath +
+        throw new IOException("Failed move of compacted file " + newFile +
             " to " + destPath);
       }
       result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
@@ -1861,6 +1959,14 @@ public class HStore implements Store, St
   }
 
   /**
+   * Set scan info, used by test
+   * @param scanInfo new scan info to use for test
+   */
+  void setScanInfo(ScanInfo scanInfo) {
+    this.scanInfo = scanInfo;
+  }
+
+  /**
    * Immutable information for scans over a store.
    */
   public static class ScanInfo {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java Thu Feb 14 12:58:12 2013
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -45,10 +46,15 @@ extends ConstantSizeRegionSplitPolicy {
   @Override
   protected void configureForRegion(HRegion region) {
     super.configureForRegion(region);
-    this.flushSize = region.getTableDesc() != null?
-      region.getTableDesc().getMemStoreFlushSize():
-      getConf().getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+    Configuration conf = getConf();
+    HTableDescriptor desc = region.getTableDesc();
+    if (desc != null) {
+      this.flushSize = desc.getMemStoreFlushSize();
+    }
+    if (this.flushSize <= 0) {
+      this.flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
+    }
   }
 
   @Override

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java Thu Feb 14 12:58:12 2013
@@ -34,7 +34,7 @@ import org.apache.hadoop.classification.
 public class KeyPrefixRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPolicy {
   private static final Log LOG = LogFactory
       .getLog(KeyPrefixRegionSplitPolicy.class);
-  public static String PREFIX_LENGTH_KEY = "prefix_split_key_policy.prefix_length";
+  public static final String PREFIX_LENGTH_KEY = "prefix_split_key_policy.prefix_length";
 
   private int prefixLength = 0;
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Thu Feb 14 12:58:12 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -47,7 +48,7 @@ class LogRoller extends HasThread implem
   private final ReentrantLock rollLock = new ReentrantLock();
   private final AtomicBoolean rollLog = new AtomicBoolean(false);
   private final Server server;
-  private final RegionServerServices services;
+  protected final RegionServerServices services;
   private volatile long lastrolltime = System.currentTimeMillis();
   // Period to roll log.
   private final long rollperiod;
@@ -92,7 +93,7 @@ class LogRoller extends HasThread implem
       try {
         this.lastrolltime = now;
         // This is array of actual region names.
-        byte [][] regionsToFlush = this.services.getWAL().rollWriter(rollLog.get());
+        byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
         if (regionsToFlush != null) {
           for (byte [] r: regionsToFlush) scheduleFlush(r);
         }
@@ -159,6 +160,10 @@ class LogRoller extends HasThread implem
     }
   }
 
+  protected HLog getWAL() throws IOException {
+    return this.services.getWAL(null);
+  }
+
   @Override
   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
     // Not interested

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Thu Feb 14 12:58:12 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -29,10 +30,10 @@ import java.util.SortedMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 import org.cliffc.high_scale_lib.Counter;
 
@@ -59,7 +61,7 @@ import com.google.common.base.Preconditi
  * @see FlushRequester
  */
 @InterfaceAudience.Private
-class MemStoreFlusher extends HasThread implements FlushRequester {
+class MemStoreFlusher implements FlushRequester {
   static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
   // These two data members go together.  Any entry in the one must have
   // a corresponding entry in the other.
@@ -71,8 +73,8 @@ class MemStoreFlusher extends HasThread 
 
   private final long threadWakeFrequency;
   private final HRegionServer server;
-  private final ReentrantLock lock = new ReentrantLock();
-  private final Condition flushOccurred = lock.newCondition();
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Object blockSignal = new Object();
 
   protected final long globalMemStoreLimit;
   protected final long globalMemStoreLimitLowMark;
@@ -87,6 +89,9 @@ class MemStoreFlusher extends HasThread 
   private long blockingWaitTime;
   private final Counter updatesBlockedMsHighWater = new Counter();
 
+  private FlushHandler[] flushHandlers = null;
+  private int handlerCount;
+
   /**
    * @param conf
    * @param server
@@ -111,6 +116,7 @@ class MemStoreFlusher extends HasThread 
       conf.getInt("hbase.hstore.blockingStoreFiles", 7);
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
       90000);
+    this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
     LOG.info("globalMemStoreLimit=" +
       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
       ", globalMemStoreLimitLowMark=" +
@@ -213,64 +219,59 @@ class MemStoreFlusher extends HasThread 
     return true;
   }
 
-  @Override
-  public void run() {
-    while (!this.server.isStopped()) {
-      FlushQueueEntry fqe = null;
-      try {
-        wakeupPending.set(false); // allow someone to wake us up again
-        fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-        if (fqe == null || fqe instanceof WakeupFlushThread) {
-          if (isAboveLowWaterMark()) {
-            LOG.debug("Flush thread woke up because memory above low water=" +
-              StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
-            if (!flushOneForGlobalPressure()) {
-              // Wasn't able to flush any region, but we're above low water mark
-              // This is unlikely to happen, but might happen when closing the
-              // entire server - another thread is flushing regions. We'll just
-              // sleep a little bit to avoid spinning, and then pretend that
-              // we flushed one, so anyone blocked will check again
-              lock.lock();
-              try {
+  private class FlushHandler extends HasThread {
+    @Override
+    public void run() {
+      while (!server.isStopped()) {
+        FlushQueueEntry fqe = null;
+        try {
+          wakeupPending.set(false); // allow someone to wake us up again
+          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          if (fqe == null || fqe instanceof WakeupFlushThread) {
+            if (isAboveLowWaterMark()) {
+              LOG.debug("Flush thread woke up because memory above low water="
+                  + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
+              if (!flushOneForGlobalPressure()) {
+                // Wasn't able to flush any region, but we're above low water mark
+                // This is unlikely to happen, but might happen when closing the
+                // entire server - another thread is flushing regions. We'll just
+                // sleep a little bit to avoid spinning, and then pretend that
+                // we flushed one, so anyone blocked will check again
                 Thread.sleep(1000);
-                flushOccurred.signalAll();
-              } finally {
-                lock.unlock();
+                wakeUpIfBlocking();
               }
+              // Enqueue another one of these tokens so we'll wake up again
+              wakeupFlushThread();
             }
-            // Enqueue another one of these tokens so we'll wake up again
-            wakeupFlushThread();
+            continue;
+          }
+          FlushRegionEntry fre = (FlushRegionEntry) fqe;
+          if (!flushRegion(fre)) {
+            break;
           }
+        } catch (InterruptedException ex) {
           continue;
-        }
-        FlushRegionEntry fre = (FlushRegionEntry)fqe;
-        if (!flushRegion(fre)) {
-          break;
-        }
-      } catch (InterruptedException ex) {
-        continue;
-      } catch (ConcurrentModificationException ex) {
-        continue;
-      } catch (Exception ex) {
-        LOG.error("Cache flusher failed for entry " + fqe, ex);
-        if (!server.checkFileSystem()) {
-          break;
+        } catch (ConcurrentModificationException ex) {
+          continue;
+        } catch (Exception ex) {
+          LOG.error("Cache flusher failed for entry " + fqe, ex);
+          if (!server.checkFileSystem()) {
+            break;
+          }
         }
       }
-    }
-    this.regionsInQueue.clear();
-    this.flushQueue.clear();
+      synchronized (regionsInQueue) {
+        regionsInQueue.clear();
+        flushQueue.clear();
+      }
 
-    // Signal anyone waiting, so they see the close flag
-    lock.lock();
-    try {
-      flushOccurred.signalAll();
-    } finally {
-      lock.unlock();
+      // Signal anyone waiting, so they see the close flag
+      wakeUpIfBlocking();
+      LOG.info(getName() + " exiting");
     }
-    LOG.info(getName() + " exiting");
   }
 
+
   private void wakeupFlushThread() {
     if (wakeupPending.compareAndSet(false, true)) {
       flushQueue.add(new WakeupFlushThread());
@@ -287,6 +288,10 @@ class MemStoreFlusher extends HasThread 
           continue;
         }
 
+        if (region.writestate.flushing || !region.writestate.writesEnabled) {
+          continue;
+        }
+
         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
           continue;
         }
@@ -332,11 +337,41 @@ class MemStoreFlusher extends HasThread 
    * Only interrupt once it's done with a run through the work loop.
    */
   void interruptIfNecessary() {
-    lock.lock();
+    lock.writeLock().lock();
     try {
-      this.interrupt();
+      for (FlushHandler flushHander : flushHandlers) {
+        if (flushHander != null) flushHander.interrupt();
+      }
     } finally {
-      lock.unlock();
+      lock.writeLock().unlock();
+    }
+  }
+
+  synchronized void start(UncaughtExceptionHandler eh) {
+    ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
+        server.getServerName().toString() + "-MemStoreFlusher", eh);
+    flushHandlers = new FlushHandler[handlerCount];
+    for (int i = 0; i < flushHandlers.length; i++) {
+      flushHandlers[i] = new FlushHandler();
+      flusherThreadFactory.newThread(flushHandlers[i]);
+      flushHandlers[i].start();
+    }
+  }
+
+  boolean isAlive() {
+    for (FlushHandler flushHander : flushHandlers) {
+      if (flushHander != null && flushHander.isAlive()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  void join() {
+    for (FlushHandler flushHander : flushHandlers) {
+      if (flushHander != null) {
+        Threads.shutdown(flushHander.getThread());
+      }
     }
   }
 
@@ -365,7 +400,8 @@ class MemStoreFlusher extends HasThread 
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
           if (!this.server.compactSplitThread.requestSplit(region)) {
             try {
-              this.server.compactSplitThread.requestCompaction(region, getName());
+              this.server.compactSplitThread.requestCompaction(region, Thread
+                  .currentThread().getName());
             } catch (IOException e) {
               LOG.error(
                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
@@ -404,8 +440,8 @@ class MemStoreFlusher extends HasThread 
         // emergencyFlush, then item was removed via a flushQueue.poll.
         flushQueue.remove(fqe);
      }
-     lock.lock();
     }
+    lock.readLock().lock();
     try {
       boolean shouldCompact = region.flushcache();
       // We just want to check the size
@@ -413,7 +449,7 @@ class MemStoreFlusher extends HasThread 
       if (shouldSplit) {
         this.server.compactSplitThread.requestSplit(region);
       } else if (shouldCompact) {
-        server.compactSplitThread.requestCompaction(region, getName());
+        server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName());
       }
 
     } catch (DroppedSnapshotException ex) {
@@ -432,15 +468,18 @@ class MemStoreFlusher extends HasThread 
         return false;
       }
     } finally {
-      try {
-        flushOccurred.signalAll();
-      } finally {
-        lock.unlock();
-      }
+      lock.readLock().unlock();
+      wakeUpIfBlocking();
     }
     return true;
   }
 
+  private void wakeUpIfBlocking() {
+    synchronized (blockSignal) {
+      blockSignal.notifyAll();
+    }
+  }
+
   private boolean isTooManyStoreFiles(HRegion region) {
     for (Store hstore : region.stores.values()) {
       if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
@@ -458,12 +497,12 @@ class MemStoreFlusher extends HasThread 
    */
   public void reclaimMemStoreMemory() {
     if (isAboveHighWaterMark()) {
-      lock.lock();
-      try {
+      long start = System.currentTimeMillis();
+      synchronized (this.blockSignal) {
         boolean blocked = false;
         long startTime = 0;
         while (isAboveHighWaterMark() && !server.isStopped()) {
-          if(!blocked){
+          if (!blocked) {
             startTime = EnvironmentEdgeManager.currentTimeMillis();
             LOG.info("Blocking updates on " + server.toString() +
             ": the global memstore size " +
@@ -476,10 +515,12 @@ class MemStoreFlusher extends HasThread 
           try {
             // we should be able to wait forever, but we've seen a bug where
             // we miss a notify, so put a 5 second bound on it at least.
-            flushOccurred.await(5, TimeUnit.SECONDS);
+            blockSignal.wait(5 * 1000);
           } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
           }
+          long took = System.currentTimeMillis() - start;
+          LOG.warn("Memstore is above high water mark and block " + took + "ms");
         }
         if(blocked){
           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
@@ -488,8 +529,6 @@ class MemStoreFlusher extends HasThread 
           }
           LOG.info("Unblocking updates for server " + server.toString());
         }
-      } finally {
-        lock.unlock();
       }
     } else if (isAboveLowWaterMark()) {
       wakeupFlushThread();
@@ -500,21 +539,21 @@ class MemStoreFlusher extends HasThread 
     return "flush_queue="
         + flushQueue.size();
   }
-  
+
   public String dumpQueue() {
     StringBuilder queueList = new StringBuilder();
     queueList.append("Flush Queue Queue dump:\n");
     queueList.append("  Flush Queue:\n");
     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
-    
+
     while(it.hasNext()){
       queueList.append("    "+it.next().toString());
       queueList.append("\n");
     }
-    
+
     return queueList.toString();
   }
-  
+
   interface FlushQueueEntry extends Delayed {}
 
   /**
@@ -530,6 +569,12 @@ class MemStoreFlusher extends HasThread 
     public int compareTo(Delayed o) {
       return -1;
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      return (this == obj);
+    }
+
   }
 
   /**
@@ -597,5 +642,17 @@ class MemStoreFlusher extends HasThread 
     public String toString() {
       return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      }
+      Delayed other = (Delayed) obj;
+      return compareTo(other) == 0;
+    }
   }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java Thu Feb 14 12:58:12 2013
@@ -40,11 +40,11 @@ interface OnlineRegions extends Server {
   /**
    * This method removes HRegion corresponding to hri from the Map of onlineRegions.
    *
-   * @param encodedRegionName
-   * @param destination - destination, if any. Null otherwise
+   * @param r Region to remove.
+   * @param destination Destination, if any, null otherwise.
    * @return True if we removed a region from online list.
    */
-  public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination);
+  public boolean removeFromOnlineRegions(final HRegion r, ServerName destination);
 
   /**
    * Return {@link HRegion} instance.

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Thu Feb 14 12:58:12 2013
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.coprocess
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -135,6 +134,7 @@ public class RegionCoprocessorHost
    */
   public RegionCoprocessorHost(final HRegion region,
       final RegionServerServices rsServices, final Configuration conf) {
+    this.conf = conf;
     this.rsServices = rsServices;
     this.region = region;
     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
@@ -214,11 +214,6 @@ public class RegionCoprocessorHost
     // It uses a visitor pattern to invoke registered Endpoint
     // method.
     for (Class c : implClass.getInterfaces()) {
-      if (CoprocessorProtocol.class.isAssignableFrom(c)) {
-        region.registerProtocol(c, (CoprocessorProtocol)instance);
-      }
-      // we allow endpoints to register as both CoproocessorProtocols and Services
-      // for ease of transition
       if (CoprocessorService.class.isAssignableFrom(c)) {
         region.registerService( ((CoprocessorService)instance).getService() );
       }
@@ -430,9 +425,11 @@ public class RegionCoprocessorHost
    * Called prior to rewriting the store files selected for compaction
    * @param store the store being compacted
    * @param scanner the scanner used to read store data during compaction
-   * @throws IOException 
+   * @param scanType type of Scan
+   * @throws IOException
    */
-  public InternalScanner preCompact(HStore store, InternalScanner scanner) throws IOException {
+  public InternalScanner preCompact(HStore store, InternalScanner scanner,
+      ScanType scanType) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     boolean bypass = false;
     for (RegionEnvironment env: coprocessors) {
@@ -440,7 +437,7 @@ public class RegionCoprocessorHost
         ctx = ObserverContext.createAndPrepare(env, ctx);
         try {
           scanner = ((RegionObserver)env.getInstance()).preCompact(
-              ctx, store, scanner);
+              ctx, store, scanner, scanType);
         } catch (Throwable e) {
           handleCoprocessorThrowable(env,e);
         }
@@ -503,7 +500,7 @@ public class RegionCoprocessorHost
 
   /**
    * Invoked before a memstore flush
-   * @throws IOException 
+   * @throws IOException
    */
   public void preFlush() throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -524,7 +521,7 @@ public class RegionCoprocessorHost
 
   /**
    * See
-   * {@link RegionObserver#preFlush(ObserverContext<RegionCoprocessorEnvironment>, HStore, KeyValueScanner)}
+   * {@link RegionObserver#preFlushScannerOpen(ObserverContext, HStore, KeyValueScanner, InternalScanner)}
    */
   public InternalScanner preFlushScannerOpen(HStore store, KeyValueScanner memstoreScanner) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -607,7 +604,7 @@ public class RegionCoprocessorHost
       }
     }
   }
-  
+
   /**
    * Invoked just before a split
    * @throws IOException
@@ -633,7 +630,7 @@ public class RegionCoprocessorHost
    * Invoked just after a split
    * @param l the new left-hand daughter region
    * @param r the new right-hand daughter region
-   * @throws IOException 
+   * @throws IOException
    */
   public void postSplit(HRegion l, HRegion r) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -651,7 +648,7 @@ public class RegionCoprocessorHost
       }
     }
   }
-  
+
   /**
    * Invoked just before the rollback of a failed split is started
    * @throws IOException
@@ -672,7 +669,7 @@ public class RegionCoprocessorHost
       }
     }
   }
-  
+
   /**
    * Invoked just after the rollback of a failed split is done
    * @throws IOException
@@ -693,7 +690,7 @@ public class RegionCoprocessorHost
       }
     }
   }
-  
+
   /**
    * Invoked after a split is completed irrespective of a failure or success.
    * @throws IOException
@@ -1355,6 +1352,35 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * This will be called by the scan flow when the current scanned row is being filtered out by the
+   * filter.
+   * @param s the scanner
+   * @param currentRow The current rowkey which got filtered out
+   * @return whether more rows are available for the scanner or not
+   * @throws IOException
+   */
+  public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow)
+      throws IOException {
+    boolean hasMore = true; // By default assume more rows there.
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env : coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        try {
+          hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow,
+              hasMore);
+        } catch (Throwable e) {
+          handleCoprocessorThrowable(env, e);
+        }
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+    return hasMore;
+  }
+  
+  /**
    * @param s the scanner
    * @return true if default behavior should be bypassed, false otherwise
    * @exception IOException Exception

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Thu Feb 14 12:58:12 2013
@@ -19,10 +19,11 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -38,8 +39,9 @@ public interface RegionServerServices ex
    */
   public boolean isStopping();
 
-  /** @return the HLog */
-  public HLog getWAL();
+  /** @return the HLog for a particular region. Pass null for getting the 
+   * default (common) WAL */
+  public HLog getWAL(HRegionInfo regionInfo) throws IOException;
 
   /**
    * @return Implementation of {@link CompactionRequestor} or null.
@@ -79,7 +81,7 @@ public interface RegionServerServices ex
    * Get the regions that are currently being opened or closed in the RS
    * @return map of regions in transition in this RS
    */
-  public Map<byte[], Boolean> getRegionsInTransitionInRS();
+  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS();
 
   /**
    * @return Return the FileSystem object used by the regionserver

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java Thu Feb 14 12:58:12 2013
@@ -91,10 +91,9 @@ public abstract class RegionSplitPolicy 
 
   /**
    * Create the RegionSplitPolicy configured for the given table.
-   * Each
    * @param region
    * @param conf
-   * @return
+   * @return a RegionSplitPolicy
    * @throws IOException
    */
   public static RegionSplitPolicy create(HRegion region,

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java Thu Feb 14 12:58:12 2013
@@ -119,7 +119,7 @@ public interface RowProcessor<S extends 
   /**
    * This method should return any additional data that is needed on the
    * server side to construct the RowProcessor. The server will pass this to
-   * the {@link #initialize(ByteString)} method. If there is no RowProcessor
+   * the {@link #initialize(Message msg)} method. If there is no RowProcessor
    * specific data then null should be returned.
    * @return the PB message
    * @throws IOException

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Thu Feb 14 12:58:12 2013
@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DeserializationException;
-import org.apache.hadoop.hbase.RegionServerStatusProtocol;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
@@ -80,7 +79,7 @@ public class SplitLogWorker extends ZooK
   private volatile String currentTask = null;
   private int currentVersion;
   private volatile boolean exitWorker;
-  private Object grabTaskLock = new Object();
+  private final Object grabTaskLock = new Object();
   private boolean workerInGrabTask = false;
 
 
@@ -109,8 +108,8 @@ public class SplitLogWorker extends ZooK
         // interrupted or has encountered a transient error and when it has
         // encountered a bad non-retry-able persistent error.
         try {
-          if (HLogSplitter.splitLogFile(rootdir,
-              fs.getFileStatus(new Path(filename)), fs, conf, p, sequenceIdChecker) == false) {
+          if (!HLogSplitter.splitLogFile(rootdir,
+              fs.getFileStatus(new Path(filename)), fs, conf, p, sequenceIdChecker)) {
             return Status.PREEMPTED;
           }
         } catch (InterruptedIOException iioe) {
@@ -249,13 +248,13 @@ public class SplitLogWorker extends ZooK
         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
         return;
       }
-      if (slt.isUnassigned() == false) {
+      if (!slt.isUnassigned()) {
         SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
         return;
       }
 
       currentVersion = stat.getVersion();
-      if (attemptToOwnTask(true) == false) {
+      if (!attemptToOwnTask(true)) {
         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
         return;
       }
@@ -277,7 +276,7 @@ public class SplitLogWorker extends ZooK
 
         @Override
         public boolean progress() {
-          if (attemptToOwnTask(false) == false) {
+          if (!attemptToOwnTask(false)) {
             LOG.warn("Failed to heartbeat the task" + currentTask);
             return false;
           }
@@ -323,7 +322,6 @@ public class SplitLogWorker extends ZooK
         Thread.interrupted();
       }
     }
-    return;
   }
 
   /**
@@ -395,8 +393,7 @@ public class SplitLogWorker extends ZooK
     } catch (KeeperException e) {
       LOG.warn("failed to end task, " + path + " " + slt, e);
     }
-    SplitLogCounters.tot_wkr_final_transistion_failed.incrementAndGet();
-    return;
+    SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
   }
 
   void getDataSetWatchAsync() {
@@ -531,7 +528,6 @@ public class SplitLogWorker extends ZooK
     worker = new Thread(null, this, "SplitLogWorker-" + serverName);
     exitWorker = false;
     worker.start();
-    return;
   }
 
   /**
@@ -558,7 +554,6 @@ public class SplitLogWorker extends ZooK
       }
       data = watcher.getRecoverableZooKeeper().removeMetaData(data);
       getDataSetWatchSuccess(path, data);
-      return;
     }
   }
 
@@ -574,7 +569,7 @@ public class SplitLogWorker extends ZooK
       DONE(),
       ERR(),
       RESIGNED(),
-      PREEMPTED();
+      PREEMPTED()
     }
     public Status exec(String name, CancelableProgressable p);
   }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Thu Feb 14 12:58:12 2013
@@ -296,7 +296,7 @@ public class SplitTransaction {
       throw new IOException(errorMsg);
     }
     if (!testing) {
-      services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName(), null);
+      services.removeFromOnlineRegions(this.parent, null);
     }
     this.journal.add(JournalEntry.OFFLINED_PARENT);
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Thu Feb 14 12:58:12 2013
@@ -43,7 +43,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.client.Scan;
@@ -58,7 +57,6 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.util.BloomFilter;
@@ -93,21 +91,6 @@ import com.google.common.collect.Orderin
 public class StoreFile {
   static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
 
-  public static enum BloomType {
-    /**
-     * Bloomfilters disabled
-     */
-    NONE,
-    /**
-     * Bloom enabled with Table row as Key
-     */
-    ROW,
-    /**
-     * Bloom enabled with Table row & column (family+qualifier) as Key
-     */
-    ROWCOL
-  }
-
   // Keys for fileinfo values in HFile
 
   /** Max Sequence ID in FileInfo */
@@ -647,7 +630,16 @@ public class StoreFile {
    */
   public Reader createReader() throws IOException {
     if (this.reader == null) {
-      this.reader = open();
+      try {
+        this.reader = open();
+      } catch (IOException e) {
+        try {
+          this.closeReader(true);
+        } catch (IOException ee) {              
+        }
+        throw e;
+      }
+
     }
     return this.reader;
   }
@@ -1551,7 +1543,7 @@ public class StoreFile {
           bloom = null;
           shouldCheckBloom = true;
         } else {
-          bloom = reader.getMetaBlock(HFileWriterV1.BLOOM_FILTER_DATA_KEY,
+          bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY,
               true);
           shouldCheckBloom = bloom != null;
         }
@@ -1761,7 +1753,7 @@ public class StoreFile {
       return reader.getTrailer().getMajorVersion();
     }
 
-    HFile.Reader getHFileReader() {
+    public HFile.Reader getHFileReader() {
       return reader;
     }
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Thu Feb 14 12:58:12 2013
@@ -219,7 +219,7 @@ public class StoreFileScanner implements
    *
    * @param s
    * @param k
-   * @return
+   * @return false if not found or if k is after the end.
    * @throws IOException
    */
   public static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
@@ -279,7 +279,7 @@ public class StoreFileScanner implements
     boolean haveToSeek = true;
     if (useBloom) {
       // check ROWCOL Bloom filter first.
-      if (reader.getBloomFilterType() == StoreFile.BloomType.ROWCOL) {
+      if (reader.getBloomFilterType() == BloomType.ROWCOL) {
         haveToSeek = reader.passesGeneralBloomFilter(kv.getBuffer(),
             kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
             kv.getQualifierOffset(), kv.getQualifierLength());

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Feb 14 12:58:12 2013
@@ -44,38 +44,38 @@ import org.apache.hadoop.hbase.util.Envi
 public class StoreScanner extends NonLazyKeyValueScanner
     implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
   static final Log LOG = LogFactory.getLog(StoreScanner.class);
-  private HStore store;
-  private ScanQueryMatcher matcher;
-  private KeyValueHeap heap;
-  private boolean cacheBlocks;
-
-  private int countPerRow = 0;
-  private int storeLimit = -1;
-  private int storeOffset = 0;
+  protected HStore store;
+  protected ScanQueryMatcher matcher;
+  protected KeyValueHeap heap;
+  protected boolean cacheBlocks;
+
+  protected int countPerRow = 0;
+  protected int storeLimit = -1;
+  protected int storeOffset = 0;
 
   // Used to indicate that the scanner has closed (see HBASE-1107)
   // Doesnt need to be volatile because it's always accessed via synchronized methods
-  private boolean closing = false;
-  private final boolean isGet;
-  private final boolean explicitColumnQuery;
-  private final boolean useRowColBloom;
-  private final Scan scan;
-  private final NavigableSet<byte[]> columns;
-  private final long oldestUnexpiredTS;
-  private final int minVersions;
+  protected boolean closing = false;
+  protected final boolean isGet;
+  protected final boolean explicitColumnQuery;
+  protected final boolean useRowColBloom;
+  protected final Scan scan;
+  protected final NavigableSet<byte[]> columns;
+  protected final long oldestUnexpiredTS;
+  protected final int minVersions;
 
   /** We don't ever expect to change this, the constant is just for clarity. */
   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
 
   /** Used during unit testing to ensure that lazy seek does save seek ops */
-  private static boolean lazySeekEnabledGlobally =
+  protected static boolean lazySeekEnabledGlobally =
       LAZY_SEEK_ENABLED_BY_DEFAULT;
 
   // if heap == null and lastTop != null, you need to reseek given the key below
-  private KeyValue lastTop = null;
+  protected KeyValue lastTop = null;
 
   /** An internal constructor. */
-  private StoreScanner(HStore store, boolean cacheBlocks, Scan scan,
+  protected StoreScanner(HStore store, boolean cacheBlocks, Scan scan,
       final NavigableSet<byte[]> columns, long ttl, int minVersions) {
     this.store = store;
     this.cacheBlocks = cacheBlocks;
@@ -203,7 +203,7 @@ public class StoreScanner extends NonLaz
    * Get a filtered list of scanners. Assumes we are not in a compaction.
    * @return list of scanners to seek
    */
-  private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
+  protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
     final boolean isCompaction = false;
     return selectScannersFrom(store.getScanners(cacheBlocks, isGet,
         isCompaction, matcher));
@@ -213,7 +213,7 @@ public class StoreScanner extends NonLaz
    * Filters the given list of scanners using Bloom filter, time range, and
    * TTL.
    */
-  private List<KeyValueScanner> selectScannersFrom(
+  protected List<KeyValueScanner> selectScannersFrom(
       final List<? extends KeyValueScanner> allScanners) {
     boolean memOnly;
     boolean filesOnly;
@@ -277,13 +277,8 @@ public class StoreScanner extends NonLaz
 
   @Override
   public synchronized boolean seek(KeyValue key) throws IOException {
-    if (this.heap == null) {
-
-      List<KeyValueScanner> scanners = getScannersNoCompaction();
-
-      heap = new KeyValueHeap(scanners, store.comparator);
-    }
-
+    // reset matcher state, in case that underlying store changed
+    checkReseek();
     return this.heap.seek(key);
   }
 
@@ -491,7 +486,7 @@ public class StoreScanner extends NonLaz
    *         next KV)
    * @throws IOException
    */
-  private boolean checkReseek() throws IOException {
+  protected boolean checkReseek() throws IOException {
     if (this.heap == null && this.lastTop != null) {
       resetScannerStack(this.lastTop);
       if (this.heap.peek() == null
@@ -507,7 +502,7 @@ public class StoreScanner extends NonLaz
     return false;
   }
 
-  private void resetScannerStack(KeyValue lastTopKey) throws IOException {
+  protected void resetScannerStack(KeyValue lastTopKey) throws IOException {
     if (heap != null) {
       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
     }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java Thu Feb 14 12:58:12 2013
@@ -148,6 +148,4 @@ public class TimeRangeTracker implements
   public String toString() {
     return "[" + minimumTimestamp + "," + maximumTimestamp + "]";
   }
-
 }
-

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Thu Feb 14 12:58:12 2013
@@ -19,391 +19,133 @@
 
 package org.apache.hadoop.hbase.regionserver.compactions;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.StringUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.List;
-import java.util.Random;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
- * The default (and only, as of now) algorithm for selecting files for compaction.
- * Combines the compaction configuration and the provisional file selection that
- * it's given to produce the list of suitable candidates for compaction.
+ * A compaction policy determines how to select files for compaction,
+ * how to compact them, and how to generate the compacted files.
  */
 @InterfaceAudience.Private
-public class CompactionPolicy {
+public abstract class CompactionPolicy extends Configured {
+
+  /**
+   * The name of the configuration parameter that specifies
+   * the class of a compaction policy that is used to compact
+   * HBase store files.
+   */
+  public static final String COMPACTION_POLICY_KEY =
+    "hbase.hstore.compaction.policy";
 
-  private static final Log LOG = LogFactory.getLog(CompactionPolicy.class);
-  private final static Calendar calendar = new GregorianCalendar();
+  private static final Class<? extends CompactionPolicy>
+    DEFAULT_COMPACTION_POLICY_CLASS = DefaultCompactionPolicy.class;
 
   CompactionConfiguration comConf;
-  StoreConfiguration storeConfig;
-
-  public CompactionPolicy(Configuration configuration, StoreConfiguration storeConfig) {
-    updateConfiguration(configuration, storeConfig);
-  }
+  Compactor compactor;
+  HStore store;
 
   /**
    * @param candidateFiles candidate files, ordered from oldest to newest
    * @return subset copy of candidate list that meets compaction criteria
    * @throws java.io.IOException
    */
-  public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
-      boolean isUserCompaction, boolean forceMajor)
-    throws IOException {
-    // Prelimanry compaction subject to filters
-    CompactSelection candidateSelection = new CompactSelection(candidateFiles);
-    long cfTtl = this.storeConfig.getStoreFileTtl();
-    if (!forceMajor) {
-      // If there are expired files, only select them so that compaction deletes them
-      if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
-        CompactSelection expiredSelection = selectExpiredStoreFiles(
-          candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
-        if (expiredSelection != null) {
-          return expiredSelection;
-        }
-      }
-      candidateSelection = skipLargeFiles(candidateSelection);
-    }
-
-    // Force a major compaction if this is a user-requested major compaction,
-    // or if we do not have too many files to compact and this was requested
-    // as a major compaction.
-    // Or, if there are any references among the candidates.
-    boolean majorCompaction = (
-      (forceMajor && isUserCompaction)
-      || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
-          && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
-      || StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
-      );
-
-    if (!majorCompaction) {
-      // we're doing a minor compaction, let's see what files are applicable
-      candidateSelection = filterBulk(candidateSelection);
-      candidateSelection = applyCompactionPolicy(candidateSelection);
-      candidateSelection = checkMinFilesCriteria(candidateSelection);
-    }
-    candidateSelection =
-        removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
-    return candidateSelection;
-  }
+  public abstract CompactSelection selectCompaction(
+    final List<StoreFile> candidateFiles, final boolean isUserCompaction,
+    final boolean forceMajor) throws IOException;
 
   /**
-   * Updates the compaction configuration. Used for tests.
-   * TODO: replace when HBASE-3909 is completed in some form.
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
    */
-  public void updateConfiguration(Configuration configuration,
-      StoreConfiguration storeConfig) {
-    this.comConf = new CompactionConfiguration(configuration, storeConfig);
-    this.storeConfig = storeConfig;
-  }
+  public abstract boolean isMajorCompaction(
+    final List<StoreFile> filesToCompact) throws IOException;
 
   /**
-   * Select the expired store files to compact
-   *
-   * @param candidates the initial set of storeFiles
-   * @param maxExpiredTimeStamp
-   *          The store file will be marked as expired if its max time stamp is
-   *          less than this maxExpiredTimeStamp.
-   * @return A CompactSelection contains the expired store files as
-   *         filesToCompact
+   * @param compactionSize Total size of some compaction
+   * @return whether this should be a large or small compaction
    */
-  private CompactSelection selectExpiredStoreFiles(
-      CompactSelection candidates, long maxExpiredTimeStamp) {
-    List<StoreFile> filesToCompact = candidates.getFilesToCompact();
-    if (filesToCompact == null || filesToCompact.size() == 0)
-      return null;
-    ArrayList<StoreFile> expiredStoreFiles = null;
-    boolean hasExpiredStoreFiles = false;
-    CompactSelection expiredSFSelection = null;
-
-    for (StoreFile storeFile : filesToCompact) {
-      if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
-        LOG.info("Deleting the expired store file by compaction: "
-            + storeFile.getPath() + " whose maxTimeStamp is "
-            + storeFile.getReader().getMaxTimestamp()
-            + " while the max expired timestamp is " + maxExpiredTimeStamp);
-        if (!hasExpiredStoreFiles) {
-          expiredStoreFiles = new ArrayList<StoreFile>();
-          hasExpiredStoreFiles = true;
-        }
-        expiredStoreFiles.add(storeFile);
-      }
-    }
-
-    if (hasExpiredStoreFiles) {
-      expiredSFSelection = new CompactSelection(expiredStoreFiles);
-    }
-    return expiredSFSelection;
-  }
+  public abstract boolean throttleCompaction(long compactionSize);
 
   /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * exclude all files above maxCompactSize
-   * Also save all references. We MUST compact them
+   * @param numCandidates Number of candidate store files
+   * @return whether a compactionSelection is possible
    */
-  private CompactSelection skipLargeFiles(CompactSelection candidates) {
-    int pos = 0;
-    while (pos < candidates.getFilesToCompact().size() &&
-      candidates.getFilesToCompact().get(pos).getReader().length() >
-        comConf.getMaxCompactSize() &&
-      !candidates.getFilesToCompact().get(pos).isReference()) {
-      ++pos;
-    }
-    if (pos > 0) {
-      LOG.debug("Some files are too large. Excluding " + pos
-          + " files from compaction candidates");
-      candidates.clearSubList(0, pos);
-    }
-    return candidates;
-  }
+  public abstract boolean needsCompaction(int numCandidates);
 
   /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * exclude all bulk load files if configured
+   * Inform the policy that some configuration has been change,
+   * so cached value should be updated it any.
    */
-  private CompactSelection filterBulk(CompactSelection candidates) {
-    candidates.getFilesToCompact().removeAll(Collections2.filter(
-        candidates.getFilesToCompact(),
-        new Predicate<StoreFile>() {
-          @Override
-          public boolean apply(StoreFile input) {
-            return input.excludeFromMinorCompaction();
-          }
-        }));
-    return candidates;
-  }
-
-  /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * take upto maxFilesToCompact from the start
-   */
-  private CompactSelection removeExcessFiles(CompactSelection candidates,
-      boolean isUserCompaction, boolean isMajorCompaction) {
-    int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
-    if (excess > 0) {
-      if (isMajorCompaction && isUserCompaction) {
-        LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
-            " files because of a user-requested major compaction");
-      } else {
-        LOG.debug("Too many admissible files. Excluding " + excess
-          + " files from compaction candidates");
-        candidates.clearSubList(comConf.getMaxFilesToCompact(),
-          candidates.getFilesToCompact().size());
-      }
+  public void updateConfiguration() {
+    if (getConf() != null && store != null) {
+      comConf = new CompactionConfiguration(getConf(), store);
     }
-    return candidates;
   }
+
   /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * forget the compactionSelection if we don't have enough files
+   * Get the compactor for this policy
+   * @return the compactor for this policy
    */
-  private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
-    int minFiles = comConf.getMinFilesToCompact();
-    if (candidates.getFilesToCompact().size() < minFiles) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Not compacting files because we only have " +
-            candidates.getFilesToCompact().size() +
-          " files ready for compaction.  Need " + minFiles + " to initiate.");
-      }
-      candidates.emptyFileList();
-    }
-    return candidates;
+  public Compactor getCompactor() {
+    return compactor;
   }
 
   /**
-    * @param candidates pre-filtrate
-    * @return filtered subset
-    * -- Default minor compaction selection algorithm:
-    * choose CompactSelection from candidates --
-    * First exclude bulk-load files if indicated in configuration.
-    * Start at the oldest file and stop when you find the first file that
-    * meets compaction criteria:
-    * (1) a recently-flushed, small file (i.e. <= minCompactSize)
-    * OR
-    * (2) within the compactRatio of sum(newer_files)
-    * Given normal skew, any newer files will also meet this criteria
-    * <p/>
-    * Additional Note:
-    * If fileSizes.size() >> maxFilesToCompact, we will recurse on
-    * compact().  Consider the oldest files first to avoid a
-    * situation where we always compact [end-threshold,end).  Then, the
-    * last file becomes an aggregate of the previous compactions.
-    *
-    * normal skew:
-    *
-    *         older ----> newer (increasing seqID)
-    *     _
-    *    | |   _
-    *    | |  | |   _
-    *  --|-|- |-|- |-|---_-------_-------  minCompactSize
-    *    | |  | |  | |  | |  _  | |
-    *    | |  | |  | |  | | | | | |
-    *    | |  | |  | |  | | | | | |
-    */
-  CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
-    if (candidates.getFilesToCompact().isEmpty()) {
-      return candidates;
-    }
-
-    // we're doing a minor compaction, let's see what files are applicable
-    int start = 0;
-    double ratio = comConf.getCompactionRatio();
-    if (isOffPeakHour() && candidates.trySetOffpeak()) {
-      ratio = comConf.getCompactionRatioOffPeak();
-      LOG.info("Running an off-peak compaction, selection ratio = " + ratio
-          + ", numOutstandingOffPeakCompactions is now "
-          + CompactSelection.getNumOutStandingOffPeakCompactions());
-    }
-
-    // get store file sizes for incremental compacting selection.
-    int countOfFiles = candidates.getFilesToCompact().size();
-    long[] fileSizes = new long[countOfFiles];
-    long[] sumSize = new long[countOfFiles];
-    for (int i = countOfFiles - 1; i >= 0; --i) {
-      StoreFile file = candidates.getFilesToCompact().get(i);
-      fileSizes[i] = file.getReader().length();
-      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
-      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
-      sumSize[i] = fileSizes[i]
-        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
-        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
-    }
-
-
-    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
-      fileSizes[start] > Math.max(comConf.getMinCompactSize(),
-          (long) (sumSize[start + 1] * ratio))) {
-      ++start;
-    }
-    if (start < countOfFiles) {
-      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
-        + " files from " + countOfFiles + " candidates");
-    }
-
-    candidates = candidates.getSubList(start, countOfFiles);
-
-    return candidates;
-  }
-
-  /*
-   * @param filesToCompact Files to compact. Can be null.
-   * @return True if we should run a major compaction.
+   * Set the new configuration
    */
-  public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
-      throws IOException {
-    boolean result = false;
-    long mcTime = getNextMajorCompactTime(filesToCompact);
-    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
-      return result;
-    }
-    // TODO: Use better method for determining stamp of last major (HBASE-2990)
-    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
-    long now = System.currentTimeMillis();
-    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
-      // Major compaction time has elapsed.
-      long cfTtl = this.storeConfig.getStoreFileTtl();
-      if (filesToCompact.size() == 1) {
-        // Single file
-        StoreFile sf = filesToCompact.get(0);
-        Long minTimestamp = sf.getMinimumTimestamp();
-        long oldest = (minTimestamp == null)
-            ? Long.MIN_VALUE
-            : now - minTimestamp.longValue();
-        if (sf.isMajorCompaction() &&
-            (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping major compaction of " + this +
-                " because one (major) compacted file only and oldestTime " +
-                oldest + "ms is < ttl=" + cfTtl);
-          }
-        } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
-          LOG.debug("Major compaction triggered on store " + this +
-            ", because keyvalues outdated; time since last major compaction " +
-            (now - lowTimestamp) + "ms");
-          result = true;
-        }
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Major compaction triggered on store " + this +
-              "; time since last major compaction " + (now - lowTimestamp) + "ms");
-        }
-        result = true;
-      }
-    }
-    return result;
-  }
-
-  public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
-    // default = 24hrs
-    long ret = comConf.getMajorCompactionPeriod();
-    if (ret > 0) {
-      // default = 20% = +/- 4.8 hrs
-      double jitterPct = comConf.getMajorCompactionJitter();
-      if (jitterPct > 0) {
-        long jitter = Math.round(ret * jitterPct);
-        // deterministic jitter avoids a major compaction storm on restart
-        Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
-        if (seed != null) {
-          double rnd = (new Random(seed)).nextDouble();
-          ret += jitter - Math.round(2L * jitter * rnd);
-        } else {
-          ret = 0; // no storefiles == no major compaction
-        }
-      }
-    }
-    return ret;
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    updateConfiguration();
   }
 
   /**
-   * @param compactionSize Total size of some compaction
-   * @return whether this should be a large or small compaction
+   * Upon construction, this method will be called with the HStore
+   * to be governed. It will be called once and only once.
    */
-  public boolean throttleCompaction(long compactionSize) {
-    return compactionSize > comConf.getThrottlePoint();
+  protected void configureForStore(HStore store) {
+    this.store = store;
+    updateConfiguration();
   }
 
   /**
-   * @param numCandidates Number of candidate store files
-   * @return whether a compactionSelection is possible
+   * Create the CompactionPolicy configured for the given HStore.
+   * @param store
+   * @param conf
+   * @return a CompactionPolicy
+   * @throws IOException
    */
-  public boolean needsCompaction(int numCandidates) {
-    return numCandidates > comConf.getMinFilesToCompact();
+  public static CompactionPolicy create(HStore store,
+      Configuration conf) throws IOException {
+    Class<? extends CompactionPolicy> clazz =
+      getCompactionPolicyClass(store.getFamily(), conf);
+    CompactionPolicy policy = ReflectionUtils.newInstance(clazz, conf);
+    policy.configureForStore(store);
+    return policy;
   }
 
-  /**
-   * @return whether this is off-peak hour
-   */
-  private boolean isOffPeakHour() {
-    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
-    int startHour = comConf.getOffPeakStartHour();
-    int endHour = comConf.getOffPeakEndHour();
-    // If offpeak time checking is disabled just return false.
-    if (startHour == endHour) {
-      return false;
-    }
-    if (startHour < endHour) {
-      return (currentHour >= startHour && currentHour < endHour);
+  static Class<? extends CompactionPolicy> getCompactionPolicyClass(
+      HColumnDescriptor family, Configuration conf) throws IOException {
+    String className = conf.get(COMPACTION_POLICY_KEY,
+      DEFAULT_COMPACTION_POLICY_CLASS.getName());
+
+    try {
+      Class<? extends CompactionPolicy> clazz =
+        Class.forName(className).asSubclass(CompactionPolicy.class);
+      return clazz;
+    } catch (Exception  e) {
+      throw new IOException(
+        "Unable to load configured region compaction policy '"
+        + className + "' for column '" + family.getNameAsString()
+        + "'", e);
     }
-    return (currentHour >= startHour || currentHour < endHour);
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Thu Feb 14 12:58:12 2013
@@ -89,7 +89,7 @@ public class CompactionRequest implement
      * Find out if a given region is in compaction now.
      *
      * @param regionId
-     * @return
+     * @return a CompactionState
      */
     public static CompactionState getCompactionState(
         final long regionId) {
@@ -174,6 +174,11 @@ public class CompactionRequest implement
       return this.hashCode() - request.hashCode();
     }
 
+    @Override
+    public boolean equals(Object obj) {
+      return (this == obj);
+    }
+
     /** Gets the HRegion for the request */
     public HRegion getHRegion() {
       return r;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java Thu Feb 14 12:58:12 2013
@@ -126,13 +126,21 @@ public class CloseRegionHandler extends 
       // Check that this region is being served here
       HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName);
       if (region == null) {
-        LOG.warn("Received CLOSE for region " + name +
-            " but currently not serving");
+        LOG.warn("Received CLOSE for region " + name + " but currently not serving - ignoring");
+        if (zk){
+          LOG.error("The znode is not modified as we are not serving " + name);
+        }
+        // TODO: do better than a simple warning
         return;
       }
 
       // Close the region
       try {
+        if (zk && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo, expectedVersion)){
+          // bad znode state
+          return; // We're node deleting the znode, but it's not ours...
+        }
+
         // TODO: If we need to keep updating CLOSING stamp to prevent against
         // a timeout if this is long-running, need to spin up a thread?
         if (region.close(abort) == null) {
@@ -152,7 +160,7 @@ public class CloseRegionHandler extends 
         throw new RuntimeException(t);
       }
 
-      this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName(), destination);
+      this.rsServices.removeFromOnlineRegions(region, destination);
 
       if (this.zk) {
         if (setClosedState(this.expectedVersion, region)) {



Mime
View raw message