hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject svn commit: r1446173 [3/5] - in /hbase/branches/hbase-7290v2: ./ bin/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/ hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/...
Date Thu, 14 Feb 2013 13:35:59 GMT
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Feb 14 13:35:54 2013
@@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.SortedSet;
@@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.util.FSUt
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableCollection;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
@@ -104,9 +106,10 @@ import com.google.common.collect.Lists;
  * <p>Locking and transactions are handled at a higher level.  This API should
  * not be called directly but by an HRegion manager.
  */
-//TODO: move StoreConfiguration implementation into a separate class.
 @InterfaceAudience.Private
-public class HStore implements Store, StoreConfiguration {
+public class HStore implements Store {
+  public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
+
   static final Log LOG = LogFactory.getLog(HStore.class);
 
   protected final MemStore memstore;
@@ -124,7 +127,6 @@ public class HStore implements Store, St
   volatile boolean forceMajor = false;
   /* how many bytes to write between status checks */
   static int closeCheckInterval = 0;
-  private final int blockingStoreFileCount;
   private volatile long storeSize = 0L;
   private volatile long totalUncompressedBytes = 0L;
   private final Object flushLock = new Object();
@@ -133,12 +135,7 @@ public class HStore implements Store, St
 
   private ScanInfo scanInfo;
 
-  /*
-   * List of store files inside this store. This is an immutable list that
-   * is atomically replaced when its contents change.
-   */
-  private volatile ImmutableList<StoreFile> storefiles = null;
-
+  private StoreFileManager storeFileManager;
   final List<StoreFile> filesCompacting = Lists.newArrayList();
 
   // All access must be synchronized.
@@ -213,8 +210,7 @@ public class HStore implements Store, St
 
     // Setting up cache configuration for this family
     this.cacheConf = new CacheConfig(conf, family);
-    this.blockingStoreFileCount =
-      conf.getInt("hbase.hstore.blockingStoreFiles", 7);
+
 
     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
 
@@ -222,7 +218,9 @@ public class HStore implements Store, St
       HStore.closeCheckInterval = conf.getInt(
           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
     }
-    this.storefiles = sortAndClone(loadStoreFiles());
+
+    this.storeFileManager = new DefaultStoreFileManager(this.comparator);
+    this.storeFileManager.loadFiles(loadStoreFiles());
 
     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
     this.checksumType = getChecksumType(conf);
@@ -293,21 +291,18 @@ public class HStore implements Store, St
     return this.fs;
   }
 
-  /* Implementation of StoreConfiguration */
+  /* Implementation of StoreConfigInformation */
+  @Override
   public long getStoreFileTtl() {
     // TTL only applies if there's no MIN_VERSIONs setting on the column.
     return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
   }
 
-  public Long getMajorCompactionPeriod() {
-    String strCompactionTime = this.family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
-    return (strCompactionTime != null) ? new Long(strCompactionTime) : null;
-  }
-
+  @Override
   public long getMemstoreFlushSize() {
     return this.region.memstoreFlushSize;
   }
-  /* End implementation of StoreConfiguration */
+  /* End implementation of StoreConfigInformation */
 
   /**
    * Returns the configured bytesPerChecksum value.
@@ -345,7 +340,7 @@ public class HStore implements Store, St
   }
 
   /**
-   * @return The maximum sequence id in all store files.
+   * @return The maximum sequence id in all store files. Used for log replay.
    */
   long getMaxSequenceId(boolean includeBulkFiles) {
     return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
@@ -533,8 +528,8 @@ public class HStore implements Store, St
    * @return All store files.
    */
   @Override
-  public List<StoreFile> getStorefiles() {
-    return this.storefiles;
+  public Collection<StoreFile> getStorefiles() {
+    return this.storeFileManager.getStorefiles();
   }
 
   @Override
@@ -637,11 +632,9 @@ public class HStore implements Store, St
     // Append the new storefile into the list
     this.lock.writeLock().lock();
     try {
-      ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
-      newFiles.add(sf);
-      this.storefiles = sortAndClone(newFiles);
+      this.storeFileManager.insertNewFile(sf);
     } finally {
-      // We need the lock, as long as we are updating the storefiles
+      // We need the lock, as long as we are updating the storeFiles
       // or changing the memstore. Let us release it before calling
       // notifyChangeReadersObservers. See HBASE-4485 for a possible
       // deadlock scenario that could have happened if continue to hold
@@ -664,13 +657,11 @@ public class HStore implements Store, St
   }
 
   @Override
-  public ImmutableList<StoreFile> close() throws IOException {
+  public ImmutableCollection<StoreFile> close() throws IOException {
     this.lock.writeLock().lock();
     try {
-      ImmutableList<StoreFile> result = storefiles;
-
       // Clear so metrics doesn't find them.
-      storefiles = ImmutableList.of();
+      ImmutableCollection<StoreFile> result = storeFileManager.clearFiles();
 
       if (!result.isEmpty()) {
         // initialize the thread pool for closing store files in parallel.
@@ -967,7 +958,7 @@ public class HStore implements Store, St
   }
 
   /*
-   * Change storefiles adding into place the Reader produced by this new flush.
+   * Change storeFiles adding into place the Reader produced by this new flush.
    * @param sf
    * @param set That was used to make the passed file <code>p</code>.
    * @throws IOException
@@ -978,13 +969,10 @@ public class HStore implements Store, St
   throws IOException {
     this.lock.writeLock().lock();
     try {
-      ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
-      newList.add(sf);
-      storefiles = sortAndClone(newList);
-
+      this.storeFileManager.insertNewFile(sf);
       this.memstore.clearSnapshot(set);
     } finally {
-      // We need the lock, as long as we are updating the storefiles
+      // We need the lock, as long as we are updating the storeFiles
       // or changing the memstore. Let us release it before calling
       // notifyChangeReadersObservers. See HBASE-4485 for a possible
       // deadlock scenario that could have happened if continue to hold
@@ -1014,14 +1002,13 @@ public class HStore implements Store, St
    * @return all scanners for this store
    */
   protected List<KeyValueScanner> getScanners(boolean cacheBlocks,
-      boolean isGet,
-      boolean isCompaction,
-      ScanQueryMatcher matcher) throws IOException {
-    List<StoreFile> storeFiles;
+      boolean isGet, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
+      byte[] stopRow) throws IOException {
+    Collection<StoreFile> storeFilesToScan;
     List<KeyValueScanner> memStoreScanners;
     this.lock.readLock().lock();
     try {
-      storeFiles = this.getStorefiles();
+      storeFilesToScan = this.storeFileManager.getFilesForScanOrGet(isGet, startRow, stopRow);
       memStoreScanners = this.memstore.getScanners();
     } finally {
       this.lock.readLock().unlock();
@@ -1033,7 +1020,7 @@ public class HStore implements Store, St
     // but now we get them in ascending order, which I think is
     // actually more correct, since memstore get put at the end.
     List<StoreFileScanner> sfScanners = StoreFileScanner
-      .getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher);
+      .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, isGet, isCompaction, matcher);
     List<KeyValueScanner> scanners =
       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
     scanners.addAll(sfScanners);
@@ -1153,15 +1140,21 @@ public class HStore implements Store, St
     return sfs;
   }
 
-  @Override
-  public void compactRecentForTesting(int N) throws IOException {
+  /**
+   * This method tries to compact N recent files for testing.
+   * Note that because compacting "recent" files only makes sense for some policies,
+   * e.g. the default one, it assumes default policy is used. It doesn't use policy,
+   * but instead makes a compaction candidate list by itself.
+   * @param N Number of files.
+   */
+  public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
     List<StoreFile> filesToCompact;
     boolean isMajor;
 
     this.lock.readLock().lock();
     try {
       synchronized (filesCompacting) {
-        filesToCompact = Lists.newArrayList(storefiles);
+        filesToCompact = Lists.newArrayList(storeFileManager.getStorefiles());
         if (!filesCompacting.isEmpty()) {
           // exclude all files older than the newest file we're currently
           // compacting. this allows us to preserve contiguity (HBASE-2856)
@@ -1176,7 +1169,7 @@ public class HStore implements Store, St
         }
 
         filesToCompact = filesToCompact.subList(count - N, count);
-        isMajor = (filesToCompact.size() == storefiles.size());
+        isMajor = (filesToCompact.size() == storeFileManager.getStorefileCount());
         filesCompacting.addAll(filesToCompact);
         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
       }
@@ -1204,7 +1197,7 @@ public class HStore implements Store, St
 
   @Override
   public boolean hasReferences() {
-    return StoreUtils.hasReferences(this.storefiles);
+    return StoreUtils.hasReferences(this.storeFileManager.getStorefiles());
   }
 
   @Override
@@ -1214,15 +1207,14 @@ public class HStore implements Store, St
 
   @Override
   public boolean isMajorCompaction() throws IOException {
-    for (StoreFile sf : this.storefiles) {
+    for (StoreFile sf : this.storeFileManager.getStorefiles()) {
+      // TODO: what are these reader checks all over the place?
       if (sf.getReader() == null) {
         LOG.debug("StoreFile " + sf + " has null Reader");
         return false;
       }
     }
-
-    List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
-    return compactionPolicy.isMajorCompaction(candidates);
+    return compactionPolicy.isMajorCompaction(this.storeFileManager.getStorefiles());
   }
 
   public CompactionRequest requestCompaction() throws IOException {
@@ -1238,22 +1230,13 @@ public class HStore implements Store, St
     CompactionRequest ret = null;
     this.lock.readLock().lock();
     try {
+      List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
       synchronized (filesCompacting) {
-        // candidates = all storefiles not already in compaction queue
-        List<StoreFile> candidates = Lists.newArrayList(storefiles);
-        if (!filesCompacting.isEmpty()) {
-          // exclude all files older than the newest file we're currently
-          // compacting. this allows us to preserve contiguity (HBASE-2856)
-          StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
-          int idx = candidates.indexOf(last);
-          Preconditions.checkArgument(idx != -1);
-          candidates.subList(0, idx + 1).clear();
-        }
-
+        // First we need to pre-select compaction, and then pre-compact selection!
+        candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting);
         boolean override = false;
         if (region.getCoprocessorHost() != null) {
-          override = region.getCoprocessorHost().preCompactSelection(
-              this, candidates);
+          override = region.getCoprocessorHost().preCompactSelection(this, candidates);
         }
         CompactSelection filesToCompact;
         if (override) {
@@ -1284,9 +1267,8 @@ public class HStore implements Store, St
         filesCompacting.addAll(filesToCompact.getFilesToCompact());
         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
 
-        // major compaction iff all StoreFiles are included
         boolean isMajor =
-            (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
+            (filesToCompact.getFilesToCompact().size() == this.getStorefilesCount());
         if (isMajor) {
           // since we're enqueuing a major, update the compaction wait interval
           this.forceMajor = false;
@@ -1382,25 +1364,22 @@ public class HStore implements Store, St
           this.family.getBloomFilterType(), this.dataBlockEncoder);
       result.createReader();
     }
+
     try {
       this.lock.writeLock().lock();
       try {
-        // Change this.storefiles so it reflects new state but do not
+        // Change this.storeFiles so it reflects new state but do not
         // delete old store files until we have sent out notification of
         // change in case old files are still being accessed by outstanding
         // scanners.
-        ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
-        newStoreFiles.removeAll(compactedFiles);
-        filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
-
-        // If a StoreFile result, move it into place.  May be null.
+        List<StoreFile> results = new ArrayList<StoreFile>(1);
         if (result != null) {
-          newStoreFiles.add(result);
+          results.add(result);
         }
-
-        this.storefiles = sortAndClone(newStoreFiles);
+        this.storeFileManager.addCompactionResults(compactedFiles, results);
+        filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
       } finally {
-        // We need the lock, as long as we are updating the storefiles
+        // We need the lock, as long as we are updating the storeFiles
         // or changing the memstore. Let us release it before calling
         // notifyChangeReadersObservers. See HBASE-4485 for a possible
         // deadlock scenario that could have happened if continue to hold
@@ -1427,7 +1406,7 @@ public class HStore implements Store, St
     // 4. Compute new store size
     this.storeSize = 0L;
     this.totalUncompressedBytes = 0L;
-    for (StoreFile hsf : this.storefiles) {
+    for (StoreFile hsf : this.storeFileManager.getStorefiles()) {
       StoreFile.Reader r = hsf.getReader();
       if (r == null) {
         LOG.warn("StoreFile " + hsf + " has a null Reader");
@@ -1439,21 +1418,6 @@ public class HStore implements Store, St
     return result;
   }
 
-  public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
-    Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
-    ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
-    return newList;
-  }
-
-  // ////////////////////////////////////////////////////////////////////////////
-  // Accessors.
-  // (This is the only section that is directly useful!)
-  //////////////////////////////////////////////////////////////////////////////
-  @Override
-  public int getNumberOfStoreFiles() {
-    return this.storefiles.size();
-  }
-
   /*
    * @param wantedVersions How many versions were asked for.
    * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
@@ -1490,10 +1454,18 @@ public class HStore implements Store, St
       // First go to the memstore.  Pick up deletes and candidates.
       this.memstore.getRowKeyAtOrBefore(state);
       // Check if match, if we got a candidate on the asked for 'kv' row.
-      // Process each store file. Run through from newest to oldest.
-      for (StoreFile sf : Lists.reverse(storefiles)) {
-        // Update the candidate keys from the current map file
-        rowAtOrBeforeFromStoreFile(sf, state);
+      // Process each relevant store file. Run through from newest to oldest.
+      Iterator<StoreFile> sfIterator =
+          this.storeFileManager.getCandidateFilesForRowKeyBefore(state.getTargetKey());
+      while (sfIterator.hasNext()) {
+        StoreFile sf = sfIterator.next();
+        sfIterator.remove(); // Remove sf from iterator.
+        boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
+        if (haveNewCandidate) {
+          // TODO: we may have an optimization here which stops the search if we find exact match.
+          sfIterator = this.storeFileManager.updateCandidateFilesForRowKeyBefore(sfIterator,
+            state.getTargetKey(), state.getCandidate());
+        }
       }
       return state.getCandidate();
     } finally {
@@ -1506,22 +1478,23 @@ public class HStore implements Store, St
    * @param f
    * @param state
    * @throws IOException
+   * @return True iff the candidate has been updated in the state.
    */
-  private void rowAtOrBeforeFromStoreFile(final StoreFile f,
+  private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
                                           final GetClosestRowBeforeTracker state)
       throws IOException {
     StoreFile.Reader r = f.getReader();
     if (r == null) {
       LOG.warn("StoreFile " + f + " has a null Reader");
-      return;
+      return false;
     }
     if (r.getEntries() == 0) {
       LOG.warn("StoreFile " + f + " is a empty store file");
-      return;
+      return false;
     }
     // TODO: Cache these keys rather than make each time?
     byte [] fk = r.getFirstKey();
-    if (fk == null) return;
+    if (fk == null) return false;
     KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
     byte [] lk = r.getLastKey();
     KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
@@ -1529,7 +1502,7 @@ public class HStore implements Store, St
     if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
       // If last key in file is not of the target table, no candidates in this
       // file.  Return.
-      if (!state.isTargetTable(lastKV)) return;
+      if (!state.isTargetTable(lastKV)) return false;
       // If the row we're looking for is past the end of file, set search key to
       // last key. TODO: Cache last and first key rather than make each time.
       firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
@@ -1537,10 +1510,10 @@ public class HStore implements Store, St
     // Get a scanner that caches blocks and that uses pread.
     HFileScanner scanner = r.getScanner(true, true, false);
     // Seek scanner.  If can't seek it, return.
-    if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
+    if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
     // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
     // Unlikely that there'll be an instance of actual first row in table.
-    if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
+    if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
     // If here, need to start backing up.
     while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
        firstOnRow.getKeyLength())) {
@@ -1550,10 +1523,11 @@ public class HStore implements Store, St
       // Make new first on row.
       firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
       // Seek scanner.  If can't seek it, break.
-      if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
+      if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
       // If we find something, break;
-      if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
+      if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
     }
+    return false;
   }
 
   /*
@@ -1612,17 +1586,12 @@ public class HStore implements Store, St
   public boolean canSplit() {
     this.lock.readLock().lock();
     try {
-      // Not splitable if we find a reference store file present in the store.
-      for (StoreFile sf : storefiles) {
-        if (sf.isReference()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(sf + " is not splittable");
-          }
-          return false;
-        }
+      // Not split-able if we find a reference store file present in the store.
+      boolean result = !hasReferences();
+      if (!result && LOG.isDebugEnabled()) {
+        LOG.debug("Cannot split region due to reference files being there");
       }
-
-      return true;
+      return result;
     } finally {
       this.lock.readLock().unlock();
     }
@@ -1632,64 +1601,14 @@ public class HStore implements Store, St
   public byte[] getSplitPoint() {
     this.lock.readLock().lock();
     try {
-      // sanity checks
-      if (this.storefiles.isEmpty()) {
-        return null;
-      }
       // Should already be enforced by the split policy!
       assert !this.region.getRegionInfo().isMetaRegion();
-
-      // Not splitable if we find a reference store file present in the store.
-      long maxSize = 0L;
-      StoreFile largestSf = null;
-      for (StoreFile sf : storefiles) {
-        if (sf.isReference()) {
-          // Should already be enforced since we return false in this case
-          assert false : "getSplitPoint() called on a region that can't split!";
-          return null;
-        }
-
-        StoreFile.Reader r = sf.getReader();
-        if (r == null) {
-          LOG.warn("Storefile " + sf + " Reader is null");
-          continue;
-        }
-
-        long size = r.length();
-        if (size > maxSize) {
-          // This is the largest one so far
-          maxSize = size;
-          largestSf = sf;
-        }
-      }
-
-      StoreFile.Reader r = largestSf.getReader();
-      if (r == null) {
-        LOG.warn("Storefile " + largestSf + " Reader is null");
+      // Not split-able if we find a reference store file present in the store.
+      if (hasReferences()) {
+        assert false : "getSplitPoint() called on a region that can't split!";
         return null;
       }
-      // Get first, last, and mid keys.  Midkey is the key that starts block
-      // in middle of hfile.  Has column and timestamp.  Need to return just
-      // the row we want to split on as midkey.
-      byte [] midkey = r.midkey();
-      if (midkey != null) {
-        KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
-        byte [] fk = r.getFirstKey();
-        KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
-        byte [] lk = r.getLastKey();
-        KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
-        // if the midkey is the same as the first or last keys, then we cannot
-        // (ever) split this region.
-        if (this.comparator.compareRows(mk, firstKey) == 0 ||
-            this.comparator.compareRows(mk, lastKey) == 0) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("cannot split because midkey is the same as first or " +
-              "last row");
-          }
-          return null;
-        }
-        return mk.getRow();
-      }
+      return this.storeFileManager.getSplitPoint();
     } catch(IOException e) {
       LOG.warn("Failed getting store size for " + this, e);
     } finally {
@@ -1745,7 +1664,7 @@ public class HStore implements Store, St
 
   @Override
   public int getStorefilesCount() {
-    return this.storefiles.size();
+    return this.storeFileManager.getStorefileCount();
   }
 
   @Override
@@ -1756,7 +1675,7 @@ public class HStore implements Store, St
   @Override
   public long getStorefilesSize() {
     long size = 0;
-    for (StoreFile s: storefiles) {
+    for (StoreFile s: this.storeFileManager.getStorefiles()) {
       StoreFile.Reader r = s.getReader();
       if (r == null) {
         LOG.warn("StoreFile " + s + " has a null Reader");
@@ -1770,7 +1689,7 @@ public class HStore implements Store, St
   @Override
   public long getStorefilesIndexSize() {
     long size = 0;
-    for (StoreFile s: storefiles) {
+    for (StoreFile s: this.storeFileManager.getStorefiles()) {
       StoreFile.Reader r = s.getReader();
       if (r == null) {
         LOG.warn("StoreFile " + s + " has a null Reader");
@@ -1784,7 +1703,7 @@ public class HStore implements Store, St
   @Override
   public long getTotalStaticIndexSize() {
     long size = 0;
-    for (StoreFile s : storefiles) {
+    for (StoreFile s : this.storeFileManager.getStorefiles()) {
       size += s.getReader().getUncompressedDataIndexSize();
     }
     return size;
@@ -1793,7 +1712,7 @@ public class HStore implements Store, St
   @Override
   public long getTotalStaticBloomSize() {
     long size = 0;
-    for (StoreFile s : storefiles) {
+    for (StoreFile s : this.storeFileManager.getStorefiles()) {
       StoreFile.Reader r = s.getReader();
       size += r.getTotalBloomSize();
     }
@@ -1811,12 +1730,12 @@ public class HStore implements Store, St
 
   @Override
   public int getCompactPriority(int priority) {
-    // If this is a user-requested compaction, leave this at the highest priority
-    if(priority == Store.PRIORITY_USER) {
-      return Store.PRIORITY_USER;
-    } else {
-      return this.blockingStoreFileCount - this.storefiles.size();
+    // If this is a user-requested compaction, leave this at the user priority
+    if (priority != Store.PRIORITY_USER) {
+      priority = this.compactionPolicy.getSystemCompactionPriority(
+        this.storeFileManager.getStorefiles());
     }
+    return priority;
   }
 
   @Override
@@ -1927,7 +1846,7 @@ public class HStore implements Store, St
 
   @Override
   public boolean needsCompaction() {
-    return compactionPolicy.needsCompaction(storefiles.size() - filesCompacting.size());
+    return compactionPolicy.needsCompaction(this.storeFileManager.getStorefiles(), filesCompacting);
   }
 
   @Override
@@ -1937,7 +1856,7 @@ public class HStore implements Store, St
 
   public static final long FIXED_OVERHEAD =
       ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
-              + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+              + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java Thu Feb 14 13:35:54 2013
@@ -28,13 +28,15 @@ import org.apache.hadoop.classification.
  * rows by a prefix of the row-key
  *
  * This ensures that a region is not split "inside" a prefix of a row key.
- * I.e. rows can be co-located in a regionb by their prefix.
+ * I.e. rows can be co-located in a region by their prefix.
  */
 @InterfaceAudience.Private
 public class KeyPrefixRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPolicy {
   private static final Log LOG = LogFactory
       .getLog(KeyPrefixRegionSplitPolicy.class);
-  public static final String PREFIX_LENGTH_KEY = "prefix_split_key_policy.prefix_length";
+  @Deprecated
+  public static final String PREFIX_LENGTH_KEY_DEPRECATED = "prefix_split_key_policy.prefix_length";
+  public static final String PREFIX_LENGTH_KEY = "KeyPrefixRegionSplitPolicy.prefix_length";
 
   private int prefixLength = 0;
 
@@ -48,10 +50,14 @@ public class KeyPrefixRegionSplitPolicy 
       String prefixLengthString = region.getTableDesc().getValue(
           PREFIX_LENGTH_KEY);
       if (prefixLengthString == null) {
-        LOG.error(PREFIX_LENGTH_KEY + " not specified for table "
-            + region.getTableDesc().getNameAsString()
-            + ". Using default RegionSplitPolicy");
-        return;
+        //read the deprecated value
+        prefixLengthString = region.getTableDesc().getValue(PREFIX_LENGTH_KEY_DEPRECATED);
+        if (prefixLengthString == null) {
+          LOG.error(PREFIX_LENGTH_KEY + " not specified for table "
+              + region.getTableDesc().getNameAsString()
+              + ". Using default RegionSplitPolicy");
+          return;
+        }
       }
       try {
         prefixLength = Integer.parseInt(prefixLengthString);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Feb 14 13:35:54 2013
@@ -61,7 +61,7 @@ public class MemStore implements HeapSiz
 
   static final String USEMSLAB_KEY =
     "hbase.hregion.memstore.mslab.enabled";
-  private static final boolean USEMSLAB_DEFAULT = false;
+  private static final boolean USEMSLAB_DEFAULT = true;
 
   private Configuration conf;
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Thu Feb 14 13:35:54 2013
@@ -85,7 +85,7 @@ class MemStoreFlusher implements FlushRe
     "hbase.regionserver.global.memstore.upperLimit";
   private static final String LOWER_KEY =
     "hbase.regionserver.global.memstore.lowerLimit";
-  private long blockingStoreFilesNumber;
+  private int blockingStoreFileCount;
   private long blockingWaitTime;
   private final Counter updatesBlockedMsHighWater = new Counter();
 
@@ -112,8 +112,8 @@ class MemStoreFlusher implements FlushRe
         "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
     }
     this.globalMemStoreLimitLowMark = lower;
-    this.blockingStoreFilesNumber =
-      conf.getInt("hbase.hstore.blockingStoreFiles", 7);
+    this.blockingStoreFileCount =
+      conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
       90000);
     this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
@@ -482,7 +482,7 @@ class MemStoreFlusher implements FlushRe
 
   private boolean isTooManyStoreFiles(HRegion region) {
     for (Store hstore : region.stores.values()) {
-      if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
+      if (hstore.getStorefilesCount() > this.blockingStoreFileCount) {
         return true;
       }
     }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Thu Feb 14 13:35:54 2013
@@ -313,11 +313,11 @@ public class SplitTransaction {
     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
     // add entry to journal BEFORE rather than AFTER the change.
     this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
-    HRegion a = createDaughterRegion(this.hri_a, this.parent.rsServices);
+    HRegion a = createDaughterRegion(this.hri_a);
 
     // Ditto
     this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
-    HRegion b = createDaughterRegion(this.hri_b, this.parent.rsServices);
+    HRegion b = createDaughterRegion(this.hri_b);
 
     // This is the point of no return.  Adding subsequent edits to .META. as we
     // do below when we do the daughter opens adding each to .META. can fail in
@@ -696,20 +696,10 @@ public class SplitTransaction {
    * @throws IOException
    * @see #cleanupDaughterRegion(FileSystem, Path, String)
    */
-  HRegion createDaughterRegion(final HRegionInfo hri,
-      final RegionServerServices rsServices)
-  throws IOException {
+  HRegion createDaughterRegion(final HRegionInfo hri) throws IOException {
     // Package private so unit tests have access.
-    FileSystem fs = this.parent.getFilesystem();
-    Path regionDir = getSplitDirForDaughter(this.parent.getFilesystem(),
-      this.splitdir, hri);
-    HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
-      this.parent.getLog(), fs, this.parent.getBaseConf(),
-      hri, this.parent.getTableDesc(), rsServices);
-    r.readRequestsCount.set(this.parent.getReadRequestsCount() / 2);
-    r.writeRequestsCount.set(this.parent.getWriteRequestsCount() / 2);
-    HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
-    return r;
+    Path regionDir = getSplitDirForDaughter(this.splitdir, hri);
+    return this.parent.createDaughterRegion(hri, regionDir);
   }
 
   private static void cleanupDaughterRegion(final FileSystem fs,
@@ -723,15 +713,13 @@ public class SplitTransaction {
   /*
    * Get the daughter directories in the splits dir.  The splits dir is under
    * the parent regions' directory.
-   * @param fs
    * @param splitdir
    * @param hri
    * @return Path to daughter split dir.
    * @throws IOException
    */
-  private static Path getSplitDirForDaughter(final FileSystem fs,
-      final Path splitdir, final HRegionInfo hri)
-  throws IOException {
+  private static Path getSplitDirForDaughter(final Path splitdir, final HRegionInfo hri)
+      throws IOException {
     return new Path(splitdir, hri.getEncodedName());
   }
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Feb 14 13:35:54 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
 
@@ -41,7 +42,7 @@ import com.google.common.collect.Immutab
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface Store extends  HeapSize {
+public interface Store extends HeapSize, StoreConfigInformation {
 
   /* The default priority for user-specified compaction requests.
    * The user gets top priority unless we have blocking compactions. (Pri <= 0)
@@ -52,7 +53,7 @@ public interface Store extends  HeapSize
   // General Accessors
   public KeyValue.KVComparator getComparator();
 
-  public List<StoreFile> getStorefiles();
+  public Collection<StoreFile> getStorefiles();
 
   /**
    * Close all the readers We don't need to worry about subsequent requests because the HRegion
@@ -60,7 +61,7 @@ public interface Store extends  HeapSize
    * @return the {@link StoreFile StoreFiles} that were previously being used.
    * @throws IOException on failure
    */
-  public ImmutableList<StoreFile> close() throws IOException;
+  public Collection<StoreFile> close() throws IOException;
 
   /**
    * Return a scanner for both the memstore and the HStore files. Assumes we are not in a
@@ -208,11 +209,6 @@ public interface Store extends  HeapSize
    */
   public HFileDataBlockEncoder getDataBlockEncoder();
 
-  /**
-   * @return the number of files in this store
-   */
-  public int getNumberOfStoreFiles();
-
   /** @return aggregate size of all HStores used in the last compaction */
   public long getLastCompactSize();
 
@@ -257,13 +253,6 @@ public interface Store extends  HeapSize
   // Test-helper methods
 
   /**
-   * Compact the most recent N files. Used in testing.
-   * @param N number of files to compact. Must be less than or equal to current number of files.
-   * @throws IOException on failure
-   */
-  public void compactRecentForTesting(int N) throws IOException;
-
-  /**
    * Used for tests.
    * @return cache configuration for this Store.
    */

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Thu Feb 14 13:35:54 2013
@@ -952,6 +952,38 @@ public class StoreFile {
   }
 
   /**
+   * Gets the approximate mid-point of this file that is optimal for use in splitting it.
+   * @param comparator Comparator used to compare KVs.
+   * @return The split point row, or null if splitting is not possible, or reader is null.
+   */
+  byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
+    if (this.reader == null) {
+      LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
+      return null;
+    }
+    // Get first, last, and mid keys.  Midkey is the key that starts block
+    // in middle of hfile.  Has column and timestamp.  Need to return just
+    // the row we want to split on as midkey.
+    byte [] midkey = this.reader.midkey();
+    if (midkey != null) {
+      KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
+      byte [] fk = this.reader.getFirstKey();
+      KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
+      byte [] lk = this.reader.getLastKey();
+      KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
+      // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
+      if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("cannot split because midkey is the same as first or last row");
+        }
+        return null;
+      }
+      return mk.getRow();
+    }
+    return null;
+  }
+
+  /**
    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
    * local because it is an implementation detail of the HBase regionserver.
    */
@@ -1770,18 +1802,19 @@ public class StoreFile {
   /**
    * Useful comparators for comparing StoreFiles.
    */
-  abstract static class Comparators {
+  public abstract static class Comparators {
     /**
      * Comparator that compares based on the Sequence Ids of the
      * the StoreFiles. Bulk loads that did not request a seq ID
      * are given a seq id of -1; thus, they are placed before all non-
      * bulk loads, and bulk loads with sequence Id. Among these files,
-     * the bulkLoadTime is used to determine the ordering.
+     * the size is used to determine the ordering, then bulkLoadTime.
      * If there are ties, the path name is used as a tie-breaker.
      */
-    static final Comparator<StoreFile> SEQ_ID =
+    public static final Comparator<StoreFile> SEQ_ID =
       Ordering.compound(ImmutableList.of(
           Ordering.natural().onResultOf(new GetSeqId()),
+          Ordering.natural().onResultOf(new GetFileSize()).reverse(),
           Ordering.natural().onResultOf(new GetBulkTime()),
           Ordering.natural().onResultOf(new GetPathName())
       ));
@@ -1793,6 +1826,13 @@ public class StoreFile {
       }
     }
 
+    private static class GetFileSize implements Function<StoreFile, Long> {
+      @Override
+      public Long apply(StoreFile sf) {
+        return sf.getReader().length();
+      }
+    }
+
     private static class GetBulkTime implements Function<StoreFile, Long> {
       @Override
       public Long apply(StoreFile sf) {
@@ -1807,19 +1847,5 @@ public class StoreFile {
         return sf.getPath().getName();
       }
     }
-
-    /**
-     * FILE_SIZE = descending sort StoreFiles (largest --> smallest in size)
-     */
-    static final Comparator<StoreFile> FILE_SIZE = Ordering.natural().reverse()
-        .onResultOf(new Function<StoreFile, Long>() {
-          @Override
-          public Long apply(StoreFile sf) {
-            if (sf == null) {
-              throw new IllegalArgumentException("StorFile can not be null");
-            }
-            return sf.getReader().length();
-          }
-        });
   }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Feb 14 13:35:54 2013
@@ -206,7 +206,7 @@ public class StoreScanner extends NonLaz
   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
     final boolean isCompaction = false;
     return selectScannersFrom(store.getScanners(cacheBlocks, isGet,
-        isCompaction, matcher));
+        isCompaction, matcher, scan.getStartRow(), scan.getStopRow()));
   }
 
   /**

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java Thu Feb 14 13:35:54 2013
@@ -29,18 +29,19 @@ public class StoreUtils {
   /**
    * Creates a deterministic hash code for store file collection.
    */
-  public static Integer getDeterministicRandomSeed(final List<StoreFile> files) {
+  public static Integer getDeterministicRandomSeed(final Collection<StoreFile> files) {
     if (files != null && !files.isEmpty()) {
-      return files.get(0).getPath().getName().hashCode();
+      return files.iterator().next().getPath().getName().hashCode();
     }
     return null;
   }
 
   /**
    * Determines whether any files in the collection are references.
+   * @param files The files.
    */
   public static boolean hasReferences(final Collection<StoreFile> files) {
-    if (files != null && files.size() > 0) {
+    if (files != null) {
       for (StoreFile hsf: files) {
         if (hsf.isReference()) {
           return true;
@@ -53,7 +54,7 @@ public class StoreUtils {
   /**
    * Gets lowest timestamp from candidate StoreFiles
    */
-  public static long getLowestTimestamp(final List<StoreFile> candidates)
+  public static long getLowestTimestamp(final Collection<StoreFile> candidates)
     throws IOException {
     long minTs = Long.MAX_VALUE;
     for (StoreFile storeFile : candidates) {
@@ -61,4 +62,24 @@ public class StoreUtils {
     }
     return minTs;
   }
+
+  /**
+   * Gets the largest file (with reader) out of the list of files.
+   * @param candidates The files to choose from.
+   * @return The largest file; null if no file has a reader.
+   */
+  static StoreFile getLargestFile(final Collection<StoreFile> candidates) {
+    long maxSize = -1L;
+    StoreFile largestSf = null;
+    for (StoreFile sf : candidates) {
+      StoreFile.Reader r = sf.getReader();
+      if (r == null) continue;
+      long size = r.length();
+      if (size > maxSize) {
+        maxSize = size;
+        largestSf = sf;
+      }
+    }
+    return largestSf;
+  }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java Thu Feb 14 13:35:54 2013
@@ -58,43 +58,6 @@ public class CompactSelection {
   }
 
   /**
-   * Select the expired store files to compact
-   * 
-   * @param maxExpiredTimeStamp
-   *          The store file will be marked as expired if its max time stamp is
-   *          less than this maxExpiredTimeStamp.
-   * @return A CompactSelection contains the expired store files as
-   *         filesToCompact
-   */
-  public CompactSelection selectExpiredStoreFilesToCompact(
-      long maxExpiredTimeStamp) {
-    if (filesToCompact == null || filesToCompact.size() == 0)
-      return null;
-    ArrayList<StoreFile> expiredStoreFiles = null;
-    boolean hasExpiredStoreFiles = false;
-    CompactSelection expiredSFSelection = null;
-
-    for (StoreFile storeFile : this.filesToCompact) {
-      if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
-        LOG.info("Deleting the expired store file by compaction: "
-            + storeFile.getPath() + " whose maxTimeStamp is "
-            + storeFile.getReader().getMaxTimestamp()
-            + " while the max expired timestamp is " + maxExpiredTimeStamp);
-        if (!hasExpiredStoreFiles) {
-          expiredStoreFiles = new ArrayList<StoreFile>();
-          hasExpiredStoreFiles = true;
-        }
-        expiredStoreFiles.add(storeFile);
-      }
-    }
-
-    if (hasExpiredStoreFiles) {
-      expiredSFSelection = new CompactSelection(expiredStoreFiles);
-    }
-    return expiredSFSelection;
-  }
-
-  /**
    * The current compaction finished, so reset the off peak compactions count
    * if this was an off peak compaction.
    */
@@ -163,10 +126,6 @@ public class CompactSelection {
     return selectionTime;
   }
 
-  public CompactSelection subList(int start, int end) {
-    throw new UnsupportedOperationException();
-  }
-
   public CompactSelection getSubList(int start, int end) {
     filesToCompact = filesToCompact.subList(start, end);
     return this;
@@ -175,8 +134,4 @@ public class CompactSelection {
   public void clearSubList(int start, int end) {
     filesToCompact.subList(start, end).clear();
   }
-
-  private boolean isValidHour(int hour) {
-    return (hour >= 0 && hour <= 23);
-  }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java Thu Feb 14 13:35:54 2013
@@ -24,7 +24,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
 
 /**
  * Compaction configuration for a particular instance of HStore.
@@ -49,7 +50,7 @@ public class CompactionConfiguration {
   private static final String CONFIG_PREFIX = "hbase.hstore.compaction.";
 
   Configuration conf;
-  StoreConfiguration storeConfig;
+  StoreConfigInformation storeConfigInfo;
 
   long maxCompactSize;
   long minCompactSize;
@@ -63,14 +64,15 @@ public class CompactionConfiguration {
   boolean shouldDeleteExpired;
   long majorCompactionPeriod;
   float majorCompactionJitter;
+  int blockingStoreFileCount;
 
-  CompactionConfiguration(Configuration conf, StoreConfiguration storeConfig) {
+  CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
     this.conf = conf;
-    this.storeConfig = storeConfig;
+    this.storeConfigInfo = storeConfigInfo;
 
     maxCompactSize = conf.getLong(CONFIG_PREFIX + "max.size", Long.MAX_VALUE);
     minCompactSize = conf.getLong(CONFIG_PREFIX + "min.size",
-        storeConfig.getMemstoreFlushSize());
+        storeConfigInfo.getMemstoreFlushSize());
     minFilesToCompact = Math.max(2, conf.getInt(CONFIG_PREFIX + "min",
           /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
     maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
@@ -89,10 +91,12 @@ public class CompactionConfiguration {
     }
 
     throttlePoint =  conf.getLong("hbase.regionserver.thread.compaction.throttle",
-          2 * maxFilesToCompact * storeConfig.getMemstoreFlushSize());
+          2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
     shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
     majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
     majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
+    blockingStoreFileCount =
+        conf.getInt("hbase.hstore.blockingStoreFiles", HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
 
     LOG.info("Compaction configuration " + this.toString());
   }
@@ -117,6 +121,13 @@ public class CompactionConfiguration {
   }
 
   /**
+   * @return store file count that will cause the memstore of this store to be blocked.
+   */
+  int getBlockingStorefileCount() {
+    return this.blockingStoreFileCount;
+  }
+
+  /**
    * @return lower bound below which compaction is selected without ratio test
    */
   long getMinCompactSize() {
@@ -184,12 +195,6 @@ public class CompactionConfiguration {
    * Major compactions are selected periodically according to this parameter plus jitter
    */
   long getMajorCompactionPeriod() {
-    if (storeConfig != null) {
-      Long storeSpecificPeriod = storeConfig.getMajorCompactionPeriod();
-      if (storeSpecificPeriod != null) {
-        return storeSpecificPeriod;
-      }
-    }
     return majorCompactionPeriod;
   }
 
@@ -211,4 +216,4 @@ public class CompactionConfiguration {
   private static boolean isValidHour(int hour) {
     return (hour >= 0 && hour <= 23);
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Thu Feb 14 13:35:54 2013
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -53,6 +54,16 @@ public abstract class CompactionPolicy e
   HStore store;
 
   /**
+   * This is called before coprocessor preCompactSelection and should filter the candidates
+   * for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time.
+   * @param candidateFiles candidate files, ordered from oldest to newest
+   * @param filesCompacting files currently compacting
+   * @return the list of files that can theoretically be compacted.
+   */
+  public abstract List<StoreFile> preSelectCompaction(
+      List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting);
+
+  /**
    * @param candidateFiles candidate files, ordered from oldest to newest
    * @return subset copy of candidate list that meets compaction criteria
    * @throws java.io.IOException
@@ -62,11 +73,21 @@ public abstract class CompactionPolicy e
     final boolean forceMajor) throws IOException;
 
   /**
+   * @param storeFiles Store files in the store.
+   * @return The system compaction priority of the store, based on storeFiles.
+   *         The priority range is as such - the smaller values are higher priority;
+   *         1 is user priority; only very important, blocking compactions should use
+   *         values lower than that. With default settings, depending on the number of
+   *         store files, the non-blocking priority will be in 2-6 range.
+   */
+  public abstract int getSystemCompactionPriority(final Collection<StoreFile> storeFiles);
+
+  /**
    * @param filesToCompact Files to compact. Can be null.
    * @return True if we should run a major compaction.
    */
   public abstract boolean isMajorCompaction(
-    final List<StoreFile> filesToCompact) throws IOException;
+    final Collection<StoreFile> filesToCompact) throws IOException;
 
   /**
    * @param compactionSize Total size of some compaction
@@ -75,10 +96,12 @@ public abstract class CompactionPolicy e
   public abstract boolean throttleCompaction(long compactionSize);
 
   /**
-   * @param numCandidates Number of candidate store files
+   * @param storeFiles Current store files.
+   * @param filesCompacting files currently compacting.
    * @return whether a compactionSelection is possible
    */
-  public abstract boolean needsCompaction(int numCandidates);
+  public abstract boolean needsCompaction(final Collection<StoreFile> storeFiles,
+      final List<StoreFile> filesCompacting);
 
   /**
    * Inform the policy that some configuration has been change,

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java Thu Feb 14 13:35:54 2013
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionse
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.Random;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.StoreUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 
@@ -52,6 +54,26 @@ public class DefaultCompactionPolicy ext
     compactor = new DefaultCompactor(this);
   }
 
+  @Override
+  public List<StoreFile> preSelectCompaction(
+      List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
+    // candidates = all storefiles not already in compaction queue
+    if (!filesCompacting.isEmpty()) {
+      // exclude all files older than the newest file we're currently
+      // compacting. this allows us to preserve contiguity (HBASE-2856)
+      StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+      int idx = candidateFiles.indexOf(last);
+      Preconditions.checkArgument(idx != -1);
+      candidateFiles.subList(0, idx + 1).clear();
+    }
+    return candidateFiles;
+  }
+
+  @Override
+  public int getSystemCompactionPriority(final Collection<StoreFile> storeFiles) {
+    return this.comConf.getBlockingStorefileCount() - storeFiles.size();
+  }
+
   /**
    * @param candidateFiles candidate files, ordered from oldest to newest
    * @return subset copy of candidate list that meets compaction criteria
@@ -293,7 +315,7 @@ public class DefaultCompactionPolicy ext
    * @param filesToCompact Files to compact. Can be null.
    * @return True if we should run a major compaction.
    */
-  public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
+  public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
       throws IOException {
     boolean result = false;
     long mcTime = getNextMajorCompactTime(filesToCompact);
@@ -308,7 +330,7 @@ public class DefaultCompactionPolicy ext
       long cfTtl = this.store.getStoreFileTtl();
       if (filesToCompact.size() == 1) {
         // Single file
-        StoreFile sf = filesToCompact.get(0);
+        StoreFile sf = filesToCompact.iterator().next();
         Long minTimestamp = sf.getMinimumTimestamp();
         long oldest = (minTimestamp == null)
             ? Long.MIN_VALUE
@@ -337,7 +359,7 @@ public class DefaultCompactionPolicy ext
     return result;
   }
 
-  public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
+  public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
     // default = 24hrs
     long ret = comConf.getMajorCompactionPeriod();
     if (ret > 0) {
@@ -366,11 +388,10 @@ public class DefaultCompactionPolicy ext
     return compactionSize > comConf.getThrottlePoint();
   }
 
-  /**
-   * @param numCandidates Number of candidate store files
-   * @return whether a compactionSelection is possible
-   */
-  public boolean needsCompaction(int numCandidates) {
+  @Override
+  public boolean needsCompaction(final Collection<StoreFile> storeFiles,
+      final List<StoreFile> filesCompacting) {
+    int numCandidates = storeFiles.size() - filesCompacting.size();
     return numCandidates > comConf.getMinFilesToCompact();
   }
 
@@ -390,4 +411,4 @@ public class DefaultCompactionPolicy ext
     }
     return (currentHour >= startHour || currentHour < endHour);
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java Thu Feb 14 13:35:54 2013
@@ -87,6 +87,7 @@ public class OpenRegionHandler extends E
   @Override
   public void process() throws IOException {
     boolean openSuccessful = false;
+    boolean transitionToFailedOpen = false;
     final String regionName = regionInfo.getRegionNameAsString();
 
     try {
@@ -130,6 +131,7 @@ public class OpenRegionHandler extends E
       HRegion region = openRegion();
       if (region == null) {
         tryTransitionFromOpeningToFailedOpen(regionInfo);
+        transitionToFailedOpen = true;
         return;
       }
       boolean failed = true;
@@ -142,6 +144,7 @@ public class OpenRegionHandler extends E
           this.rsServices.isStopping()) {
         cleanupFailedOpen(region);
         tryTransitionFromOpeningToFailedOpen(regionInfo);
+        transitionToFailedOpen = true;
         return;
       }
 
@@ -154,6 +157,7 @@ public class OpenRegionHandler extends E
         // In all cases, we try to transition to failed_open to be safe.
         cleanupFailedOpen(region);
         tryTransitionFromOpeningToFailedOpen(regionInfo);
+        transitionToFailedOpen = true;
         return;
       }
 
@@ -197,6 +201,8 @@ public class OpenRegionHandler extends E
               " should be closed is now opened."
           );
         }
+      } else if (transitionToFailedOpen == false) {
+        tryTransitionFromOpeningToFailedOpen(regionInfo);
       }
     }
   }
@@ -455,7 +461,7 @@ public class OpenRegionHandler extends E
     return region;
   }
 
-  private void cleanupFailedOpen(final HRegion region) throws IOException {
+  void cleanupFailedOpen(final HRegion region) throws IOException {
     if (region != null) region.close();
   }
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Thu Feb 14 13:35:54 2013
@@ -827,6 +827,7 @@ class FSHLog implements HLog, Syncable {
       }
       if (this.writer != null) {
         this.writer.close();
+        this.writer = null;
       }
     }
   }
@@ -1081,35 +1082,43 @@ class FSHLog implements HLog, Syncable {
       // issue the sync to HDFS. If sync is successful, then update
       // syncedTillHere to indicate that transactions till this
       // number has been successfully synced.
+      IOException ioe = null;
+      List<Entry> pending = null;
       synchronized (flushLock) {
         if (txid <= this.syncedTillHere) {
           return;
         }
         doneUpto = this.unflushedEntries.get();
-        List<Entry> pending = logSyncerThread.getPendingWrites();
+        pending = logSyncerThread.getPendingWrites();
         try {
           logSyncerThread.hlogFlush(tempWriter, pending);
         } catch(IOException io) {
-          synchronized (this.updateLock) {
+          ioe = io;
+          LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
+        }
+      }
+      if (ioe != null && pending != null) {
+        synchronized (this.updateLock) {
+          synchronized (flushLock) {
             // HBASE-4387, HBASE-5623, retry with updateLock held
             tempWriter = this.writer;
             logSyncerThread.hlogFlush(tempWriter, pending);
           }
-        }
+        }          
       }
       // another thread might have sync'ed avoid double-sync'ing
       if (txid <= this.syncedTillHere) {
         return;
       }
       try {
-        tempWriter.sync();
+        if (tempWriter != null) tempWriter.sync();
       } catch(IOException ex) {
         synchronized (this.updateLock) {
           // HBASE-4387, HBASE-5623, retry with updateLock held
           // TODO: we don't actually need to do it for concurrent close - what is the point
           //       of syncing new unrelated writer? Keep behavior for now.
           tempWriter = this.writer;
-          tempWriter.sync();
+          if (tempWriter != null) tempWriter.sync();
         }
       }
       this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Thu Feb 14 13:35:54 2013
@@ -83,13 +83,6 @@ import com.google.common.collect.Lists;
 public class HLogSplitter {
   private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
 
-  /**
-   * Name of file that holds recovered edits written by the wal log splitting
-   * code, one per region
-   */
-  public static final String RECOVERED_EDITS = "recovered.edits";
-
-
   static final Log LOG = LogFactory.getLog(HLogSplitter.class);
 
   private boolean hasSplit = false;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java Thu Feb 14 13:35:54 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.rest.filter.GzipFilter;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.hadoop.hbase.util.Strings;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.net.DNS;
@@ -64,7 +65,7 @@ public class RESTServer implements Const
     HelpFormatter formatter = new HelpFormatter();
     formatter.printHelp("bin/hbase rest start", "", options,
       "\nTo run the REST server as a daemon, execute " +
-      "bin/hbase-daemon.sh start|stop rest [-p <port>] [-ro]\n", true);
+      "bin/hbase-daemon.sh start|stop rest [--infoport <port>] [-p <port>] [-ro]\n", true);
     System.exit(exitCode);
   }
 
@@ -84,6 +85,7 @@ public class RESTServer implements Const
     options.addOption("p", "port", true, "Port to bind to [default: 8080]");
     options.addOption("ro", "readonly", false, "Respond only to GET HTTP " +
       "method requests [default: false]");
+    options.addOption(null, "infoport", true, "Port for web UI");
 
     CommandLine commandLine = null;
     try {
@@ -107,6 +109,14 @@ public class RESTServer implements Const
       LOG.debug("readonly set to true");
     }
 
+    // check for user-defined info server port setting, if so override the conf
+    if (commandLine != null && commandLine.hasOption("infoport")) {
+      String val = commandLine.getOptionValue("infoport");
+      servlet.getConfiguration()
+          .setInt("hbase.rest.info.port", Integer.valueOf(val));
+      LOG.debug("Web UI port set to " + val);
+    }
+
     @SuppressWarnings("unchecked")
     List<String> remainingArgs = commandLine != null ?
         commandLine.getArgList() : new ArrayList<String>();
@@ -169,6 +179,16 @@ public class RESTServer implements Const
         machineName);
     }
 
+    // Put up info server.
+    int port = conf.getInt("hbase.rest.info.port", 8085);
+    if (port >= 0) {
+      conf.setLong("startcode", System.currentTimeMillis());
+      String a = conf.get("hbase.rest.info.bindAddress", "0.0.0.0");
+      InfoServer infoServer = new InfoServer("rest", a, port, false, conf);
+      infoServer.setAttribute("hbase.conf", conf);
+      infoServer.start();
+    }
+
     // start server
     server.start();
     server.join();

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Thu Feb 14 13:35:54 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
+import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 
@@ -60,6 +61,8 @@ public class ThriftServer {
   private Configuration conf;
   ThriftServerRunner serverRunner;
 
+  private InfoServer infoServer;
+
   //
   // Main program and support routines
   //
@@ -86,6 +89,16 @@ public class ThriftServer {
    void doMain(final String[] args) throws Exception {
      processOptions(args);
      serverRunner = new ThriftServerRunner(conf);
+
+     // Put up info server.
+     int port = conf.getInt("hbase.thrift.info.port", 9095);
+     if (port >= 0) {
+       conf.setLong("startcode", System.currentTimeMillis());
+       String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
+       infoServer = new InfoServer("thrift", a, port, false, conf);
+       infoServer.setAttribute("hbase.conf", conf);
+       infoServer.start();
+     }
      serverRunner.run();
   }
 
@@ -101,6 +114,7 @@ public class ThriftServer {
     options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
     options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
     options.addOption("h", "help", false, "Print help information");
+    options.addOption(null, "infoport", true, "Port for web UI");
 
     options.addOption("m", MIN_WORKERS_OPTION, true,
         "The minimum number of worker threads for " +
@@ -147,6 +161,18 @@ public class ThriftServer {
       printUsageAndExit(options, -1);
     }
 
+    // check for user-defined info server port setting, if so override the conf
+    try {
+      if (cmd.hasOption("infoport")) {
+        String val = cmd.getOptionValue("infoport");
+        conf.setInt("hbase.thrift.info.port", Integer.valueOf(val));
+        LOG.debug("Web UI port set to " + val);
+      }
+    } catch (NumberFormatException e) {
+      LOG.error("Could not parse the value provided for the infoport option", e);
+      printUsageAndExit(options, -1);
+    }
+
     // Make optional changes to the configuration based on command-line options
     optionToConf(cmd, MIN_WORKERS_OPTION,
         conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
@@ -171,6 +197,14 @@ public class ThriftServer {
   }
 
   public void stop() {
+    if (this.infoServer != null) {
+      LOG.info("Stopping infoServer");
+      try {
+        this.infoServer.stop();
+      } catch (Exception ex) {
+        ex.printStackTrace();
+      }
+    }
     serverRunner.shutdown();
   }
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java Thu Feb 14 13:35:54 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.thrift.Ca
 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
 import org.apache.hadoop.hbase.thrift.ThriftMetrics;
 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
+import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
@@ -66,6 +67,7 @@ import com.google.common.util.concurrent
  * HbaseClient.thrift IDL file.
  */
 @InterfaceAudience.Private
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class ThriftServer {
   private static final Log log = LogFactory.getLog(ThriftServer.class);
 
@@ -91,6 +93,7 @@ public class ThriftServer {
     options.addOption("f", "framed", false, "Use framed transport");
     options.addOption("c", "compact", false, "Use the compact protocol");
     options.addOption("h", "help", false, "Print help information");
+    options.addOption(null, "infoport", true, "Port for web UI");
 
     OptionGroup servers = new OptionGroup();
     servers.addOption(
@@ -225,15 +228,51 @@ public class ThriftServer {
       Configuration conf = HBaseConfiguration.create();
       ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
 
+      String implType = "threadpool";
+      if (nonblocking) {
+        implType = "nonblocking";
+      } else if (hsha) {
+        implType = "hsha";
+      }
+
+      conf.set("hbase.regionserver.thrift.server.type", implType);
+      conf.setInt("hbase.regionserver.thrift.port", listenPort);
+
       // Construct correct ProtocolFactory
-      TProtocolFactory protocolFactory = getTProtocolFactory(cmd.hasOption("compact"));
+      boolean compact = cmd.hasOption("compact");
+      TProtocolFactory protocolFactory = getTProtocolFactory(compact);
       THBaseService.Iface handler =
           ThriftHBaseServiceHandler.newInstance(conf, metrics);
       THBaseService.Processor processor = new THBaseService.Processor(handler);
+      conf.setBoolean("hbase.regionserver.thrift.compact", compact);
 
       boolean framed = cmd.hasOption("framed") || nonblocking || hsha;
       TTransportFactory transportFactory = getTTransportFactory(framed);
       InetSocketAddress inetSocketAddress = bindToPort(cmd.getOptionValue("bind"), listenPort);
+      conf.setBoolean("hbase.regionserver.thrift.framed", framed);
+
+      // check for user-defined info server port setting, if so override the conf
+      try {
+        if (cmd.hasOption("infoport")) {
+          String val = cmd.getOptionValue("infoport");
+          conf.setInt("hbase.thrift.info.port", Integer.valueOf(val));
+          log.debug("Web UI port set to " + val);
+        }
+      } catch (NumberFormatException e) {
+        log.error("Could not parse the value provided for the infoport option", e);
+        printUsage();
+        System.exit(1);
+      }
+
+      // Put up info server.
+      int port = conf.getInt("hbase.thrift.info.port", 9095);
+      if (port >= 0) {
+        conf.setLong("startcode", System.currentTimeMillis());
+        String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
+        InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
+        infoServer.setAttribute("hbase.conf", conf);
+        infoServer.start();
+      }
 
       if (nonblocking) {
         server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);



Mime
View raw message