hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r564012 [2/4] - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/
Date Wed, 08 Aug 2007 20:30:20 GMT
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=564012&r1=564011&r2=564012
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Wed Aug  8 13:30:13 2007
@@ -36,8 +36,9 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 
@@ -68,7 +69,7 @@
  * each column family.  (This config info will be communicated via the 
  * tabledesc.)
  * 
- * The HTableDescriptor contains metainfo about the HRegion's table.
+ * <p>The HTableDescriptor contains metainfo about the HRegion's table.
  * regionName is a unique identifier for this HRegion. (startKey, endKey]
  * defines the keyspace for this HRegion.
  */
@@ -76,24 +77,12 @@
   static String SPLITDIR = "splits";
   static String MERGEDIR = "merges";
   static String TMPREGION_PREFIX = "tmpregion_";
-  static Random rand = new Random();
+  static final Random rand = new Random();
   static final Log LOG = LogFactory.getLog(HRegion.class);
   final AtomicBoolean closed = new AtomicBoolean(false);
   private long noFlushCount = 0;
-
-  /**
-   * Deletes all the files for a HRegion
-   * 
-   * @param fs                  - the file system object
-   * @param baseDirectory       - base directory for HBase
-   * @param regionName          - name of the region to delete
-   * @throws IOException
-   */
-  static void deleteRegion(FileSystem fs, Path baseDirectory,
-      Text regionName) throws IOException {
-    LOG.info("Deleting region " + regionName);
-    fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
-  }
+  static final Text COL_SPLITA = new Text(COLUMN_FAMILY_STR + "splitA");
+  static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB");
   
   /**
    * Merge two HRegions.  They must be available on the current
@@ -108,7 +97,7 @@
 
     // Make sure that srcA comes first; important for key-ordering during
     // write of the merged file.
-    
+    FileSystem fs = srcA.getFilesystem();
     if(srcA.getStartKey() == null) {
       if(srcB.getStartKey() == null) {
         throw new IOException("Cannot merge two regions with null start key");
@@ -126,7 +115,6 @@
       throw new IOException("Cannot merge non-adjacent regions");
     }
 
-    FileSystem fs = a.getFilesystem();
     Configuration conf = a.getConf();
     HTableDescriptor tabledesc = a.getTableDesc();
     HLog log = a.getLog();
@@ -143,22 +131,21 @@
     HRegionInfo newRegionInfo
       = new HRegionInfo(Math.abs(rand.nextLong()), tabledesc, startKey, endKey);
     
-    Path newRegionDir = HStoreFile.getHRegionDir(merges, newRegionInfo.regionName);
+    Path newRegionDir = HRegion.getRegionDir(merges, newRegionInfo.regionName);
 
     if(fs.exists(newRegionDir)) {
       throw new IOException("Cannot merge; target file collision at " + newRegionDir);
     }
 
-    LOG.info("starting merge of regions: " + a.getRegionName() + " and " 
-        + b.getRegionName() + " new region start key is '" 
-        + (startKey == null ? "" : startKey) + "', end key is '" 
-        + (endKey == null ? "" : endKey) + "'");
+    LOG.info("starting merge of regions: " + a.getRegionName() + " and " +
+      b.getRegionName() + " into new region " + newRegionInfo.toString());
     
     // Flush each of the sources, and merge their files into a single 
     // target for each column family.    
     TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
     TreeMap<Text, Vector<HStoreFile>> filesToMerge =
       new TreeMap<Text, Vector<HStoreFile>>();
+    
     for(HStoreFile src: a.flushcache(true)) {
       Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
       if(v == null) {
@@ -185,7 +172,7 @@
       Text colFamily = es.getKey();
       Vector<HStoreFile> srcFiles = es.getValue();
       HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName, 
-          colFamily, Math.abs(rand.nextLong()));
+        colFamily, Math.abs(rand.nextLong()));
       dst.mergeStoreFiles(srcFiles, fs, conf);
       alreadyMerged.addAll(srcFiles);
     }
@@ -193,7 +180,6 @@
     // That should have taken care of the bulk of the data.
     // Now close the source HRegions for good, and repeat the above to take care
     // of any last-minute inserts
-
     if(LOG.isDebugEnabled()) {
       LOG.debug("flushing changes since start of merge for region " 
           + a.getRegionName());
@@ -235,13 +221,13 @@
     for (Map.Entry<Text, Vector<HStoreFile>> es : filesToMerge.entrySet()) {
       Text colFamily = es.getKey();
       Vector<HStoreFile> srcFiles = es.getValue();
-      HStoreFile dst = new HStoreFile(conf, merges,
-        newRegionInfo.regionName, colFamily, Math.abs(rand.nextLong()));
+      HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
+        colFamily, Math.abs(rand.nextLong()));
       dst.mergeStoreFiles(srcFiles, fs, conf);
     }
 
     // Done
-    
+    // Construction moves the merge files into place under region.
     HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo,
         newRegionDir);
 
@@ -304,7 +290,6 @@
    * appropriate log info for this HRegion. If there is a previous log file
    * (implying that the HRegion has been written-to before), then read it from
    * the supplied path.
-   * 
    * @param rootDir root directory for HBase instance
    * @param fs is the filesystem.  
    * @param conf is global configuration settings.
@@ -324,29 +309,27 @@
     this.conf = conf;
     this.regionInfo = regionInfo;
     this.memcache = new HMemcache();
-
-
     this.writestate.writesOngoing = true;
     this.writestate.writesEnabled = true;
 
     // Declare the regionName.  This is a unique string for the region, used to 
     // build a unique filename.
-    
-    this.regiondir = HStoreFile.getHRegionDir(rootDir, this.regionInfo.regionName);
+    this.regiondir = HRegion.getRegionDir(rootDir, this.regionInfo.regionName);
     Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
 
-    // Move prefab HStore files into place (if any)
-    
+    // Move prefab HStore files into place (if any).  This picks up split files
+    // and any merges from splits and merges dirs.
     if(initialFiles != null && fs.exists(initialFiles)) {
-      fs.rename(initialFiles, regiondir);
+      fs.rename(initialFiles, this.regiondir);
     }
 
     // Load in all the HStores.
     for(Map.Entry<Text, HColumnDescriptor> e :
         this.regionInfo.tableDesc.families().entrySet()) {
       Text colFamily = HStoreKey.extractFamily(e.getKey());
-      stores.put(colFamily, new HStore(rootDir, this.regionInfo.regionName,
-        e.getValue(), fs, oldLogFile, conf));
+      stores.put(colFamily,
+        new HStore(rootDir, this.regionInfo.regionName, e.getValue(), fs,
+          oldLogFile, conf));
     }
 
     // Get rid of any splits or merges that were lost in-progress
@@ -359,7 +342,7 @@
       fs.delete(merges);
     }
 
-    // By default, we flush the cache when 32M.
+    // By default, we flush the cache when 16M.
     this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
       1024*1024*16);
     this.blockingMemcacheSize = this.memcacheFlushSize *
@@ -367,8 +350,8 @@
     
     // By default, we compact the region if an HStore has more than
     // MIN_COMMITS_FOR_COMPACTION map files
-    this.compactionThreshold = conf.getInt("hbase.hregion.compactionThreshold",
-      3);
+    this.compactionThreshold =
+      conf.getInt("hbase.hregion.compactionThreshold", 3);
     
     // By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
     this.desiredMaxFileSize =
@@ -397,9 +380,8 @@
    * time-sensitive thread.
    * 
    * @return Vector of all the storage files that the HRegion's component 
-   * HStores make use of.  It's a list of HStoreFile objects.  Returns empty
-   * vector if already closed and null if it is judged that it should not
-   * close.
+   * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
+   * vector if already closed and null if judged that it should not close.
    * 
    * @throws IOException
    */
@@ -443,7 +425,6 @@
       if(!shouldClose) {
         return null;
       }
-      LOG.info("closing region " + this.regionInfo.regionName);
       
       // Write lock means no more row locks can be given out.  Wait on
       // outstanding row locks to come in before we close so we do not drop
@@ -465,124 +446,121 @@
           writestate.writesOngoing = false;
         }
         this.closed.set(true);
-        LOG.info("region " + this.regionInfo.regionName + " closed");
+        LOG.info("closed " + this.regionInfo.regionName);
       }
     } finally {
       lock.releaseWriteLock();
     }
   }
-
-  /**
-   * Split the HRegion to create two brand-new ones.  This will also close the 
-   * current HRegion.
-   *
-   * Returns two brand-new (and open) HRegions
+  
+  /*
+   * Split the HRegion to create two brand-new ones.  This also closes
+   * current HRegion.  Split should be fast since we don't rewrite store files
+   * but instead create new 'reference' store files that read off the top and
+   * bottom ranges of parent store files.
+   * @param midKey Row to split on.
+   * @param listener May be null.
+   * @return two brand-new (and open) HRegions
+   * @throws IOException
    */
-  HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
+  HRegion[] closeAndSplit(final Text midKey,
+      final RegionUnavailableListener listener)
   throws IOException {
-    if(((regionInfo.startKey.getLength() != 0)
-        && (regionInfo.startKey.compareTo(midKey) > 0))
-        || ((regionInfo.endKey.getLength() != 0)
-            && (regionInfo.endKey.compareTo(midKey) < 0))) {
-      throw new IOException("Region splitkey must lie within region " +
-        "boundaries.");
-    }
-
+    checkMidKey(midKey);
     long startTime = System.currentTimeMillis();
-    Path splits = new Path(regiondir, SPLITDIR);
-    if(! fs.exists(splits)) {
-      fs.mkdirs(splits);
-    }
-    
-    long regionAId = Math.abs(rand.nextLong());
-    HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc, 
-      regionInfo.startKey, midKey);
-    long regionBId = Math.abs(rand.nextLong());
-    HRegionInfo regionBInfo =
-      new HRegionInfo(regionBId, regionInfo.tableDesc, midKey, null);
-    Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName);
-    Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName);
-    if(fs.exists(dirA) || fs.exists(dirB)) {
-      throw new IOException("Cannot split; target file collision at " + dirA +
-        " or " + dirB);
+    Path splits = getSplitsDir();
+    HRegionInfo regionAInfo = new HRegionInfo(Math.abs(rand.nextLong()),
+      this.regionInfo.tableDesc, this.regionInfo.startKey, midKey);
+    Path dirA = getSplitRegionDir(splits, regionAInfo.regionName);
+    if(fs.exists(dirA)) {
+      throw new IOException("Cannot split; target file collision at " + dirA);
+    }
+    HRegionInfo regionBInfo = new HRegionInfo(Math.abs(rand.nextLong()),
+      this.regionInfo.tableDesc, midKey, null);
+    Path dirB = getSplitRegionDir(splits, regionBInfo.regionName);
+    if(this.fs.exists(dirB)) {
+      throw new IOException("Cannot split; target file collision at " + dirB);
     }
 
-    // We just copied most of the data. Now get whatever updates are up in
-    // the memcache (after shutting down new updates).
-    
     // Notify the caller that we are about to close the region. This moves
-    // us ot the 'retiring' queue. Means no more updates coming in -- just
+    // us to the 'retiring' queue. Means no more updates coming in -- just
     // whatever is outstanding.
-    listener.closing(this.getRegionName());
+    if (listener != null) {
+      listener.closing(getRegionName());
+    }
 
-    // Wait on the last row updates to come in.
-    LOG.debug("Starting wait on row locks.");
-    waitOnRowLocks();
-
-    // Flush this HRegion out to storage, and turn off flushes
-    // or compactions until close() is called.
-    LOG.debug("Calling flushcache inside closeAndSplit");
-    Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
+    
+    // Now close the HRegion.  Close returns all store files or null if not
+    // supposed to close (? What to do in this case? Implement abort of close?)
+    // Close also does wait on outstanding rows and calls a flush just-in-case.
+    Vector<HStoreFile> hstoreFilesToSplit = close();
     if (hstoreFilesToSplit == null) {
-      // It should always return a list of hstore files even if memcache is
-      // empty.  It will return null if concurrent compaction or splits which
-      // should not happen.
-      throw new NullPointerException("Flushcache did not return any files");
-    }
-    TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
-    for(HStoreFile hsf: hstoreFilesToSplit) {
-      alreadySplit.add(splitStoreFile(hsf, splits, regionAInfo,
-        regionBInfo, midKey));
+      LOG.warn("Close came back null (Implement abort of close?)");
     }
     
-    // Now close the HRegion
-    hstoreFilesToSplit = close();
-    
     // Tell listener that region is now closed and that they can therefore
     // clean up any outstanding references.
-    listener.closed(this.getRegionName());
-    
-    // Copy the small remainder
-    for(HStoreFile hsf: hstoreFilesToSplit) {
-      if(!alreadySplit.contains(hsf)) {
-        splitStoreFile(hsf, splits, regionAInfo, regionBInfo, midKey);
-      }
+    if (listener != null) {
+      listener.closed(this.getRegionName());
     }
-
-    // Done
+    
+    // Split each store file.
+    for(HStoreFile h: hstoreFilesToSplit) {
+      // A reference to the bottom half of the hsf store file.
+      HStoreFile.Reference aReference = new HStoreFile.Reference(
+        getRegionName(), h.getFileId(), new HStoreKey(midKey),
+        HStoreFile.Range.bottom);
+      HStoreFile a = new HStoreFile(this.conf, splits,
+        regionAInfo.regionName, h.getColFamily(), Math.abs(rand.nextLong()),
+        aReference);
+      HStoreFile.Reference bReference = new HStoreFile.Reference(
+        getRegionName(), h.getFileId(), new HStoreKey(midKey),
+        HStoreFile.Range.top);
+      HStoreFile b = new HStoreFile(this.conf, splits,
+        regionBInfo.regionName, h.getColFamily(), Math.abs(rand.nextLong()),
+        bReference);
+      h.splitStoreFile(a, b, this.fs);
+    }
+
+    // Done!
+    // Opening the region copies the splits files from the splits directory
+    // under each region.
     HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
     HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
 
     // Cleanup
-    fs.delete(splits);    // Get rid of splits directory
-    fs.delete(regiondir); // and the directory for the old region
-    HRegion regions[] = new HRegion[2];
-    regions[0] = regionA;
-    regions[1] = regionB;
-    LOG.info("Region split of " + this.regionInfo.regionName + " complete. " +
-      "New regions are: " + regions[0].getRegionName() + ", " +
-      regions[1].getRegionName() + ". Took " +
+    boolean deleted = fs.delete(splits);    // Get rid of splits directory
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cleaned up " + splits.toString() + " " + deleted);
+    }
+    HRegion regions[] = new HRegion [] {regionA, regionB};
+    LOG.info("Region split of " + this.regionInfo.regionName + " complete; " +
+      "new regions: " + regions[0].getRegionName() + ", " +
+      regions[1].getRegionName() + ". Split took " +
       StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
     return regions;
   }
   
-  private HStoreFile splitStoreFile(final HStoreFile hsf, final Path splits,
-      final HRegionInfo a, final HRegionInfo b, final Text midKey)
-  throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Started splitting HStore " + hsf.getRegionName() + "/" +
-        hsf.getColFamily() + "/" + hsf.fileId());
+  private void checkMidKey(final Text midKey) throws IOException {
+    if(((this.regionInfo.startKey.getLength() != 0)
+        && (this.regionInfo.startKey.compareTo(midKey) > 0))
+        || ((this.regionInfo.endKey.getLength() != 0)
+            && (this.regionInfo.endKey.compareTo(midKey) < 0))) {
+      throw new IOException("Region splitkey must lie within region " +
+        "boundaries.");
     }
-    HStoreFile dstA = new HStoreFile(conf, splits, a.regionName, 
-      hsf.getColFamily(), Math.abs(rand.nextLong()));
-    HStoreFile dstB = new HStoreFile(conf, splits, b.regionName, 
-      hsf.getColFamily(), Math.abs(rand.nextLong()));
-    hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Finished splitting HStore " + hsf.getRegionName() + "/" +
-        hsf.getColFamily() + "/" + hsf.fileId());
+  }
+  
+  private Path getSplitRegionDir(final Path splits, final Text regionName) {
+    return HRegion.getRegionDir(splits, regionName);
+  }
+  
+  private Path getSplitsDir() throws IOException {
+    Path splits = new Path(this.regiondir, SPLITDIR);
+    if(!this.fs.exists(splits)) {
+      this.fs.mkdirs(splits);
     }
-    return hsf;
+    return splits;
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -591,22 +569,22 @@
 
   /** @return start key for region */
   public Text getStartKey() {
-    return regionInfo.startKey;
+    return this.regionInfo.startKey;
   }
 
   /** @return end key for region */
   public Text getEndKey() {
-    return regionInfo.endKey;
+    return this.regionInfo.endKey;
   }
 
   /** @return region id */
   public long getRegionId() {
-    return regionInfo.regionId;
+    return this.regionInfo.regionId;
   }
 
   /** @return region name */
   public Text getRegionName() {
-    return regionInfo.regionName;
+    return this.regionInfo.regionName;
   }
 
   /** @return root directory path */
@@ -616,27 +594,27 @@
 
   /** @return HTableDescriptor for this region */
   public HTableDescriptor getTableDesc() {
-    return regionInfo.tableDesc;
+    return this.regionInfo.tableDesc;
   }
 
   /** @return HLog in use for this region */
   public HLog getLog() {
-    return log;
+    return this.log;
   }
 
   /** @return Configuration object */
   public Configuration getConf() {
-    return conf;
+    return this.conf;
   }
 
   /** @return region directory Path */
   public Path getRegionDir() {
-    return regiondir;
+    return this.regiondir;
   }
 
   /** @return FileSystem being used by this region */
   public FileSystem getFilesystem() {
-    return fs;
+    return this.fs;
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -646,63 +624,74 @@
   // upkeep.
   //////////////////////////////////////////////////////////////////////////////
 
-  /**
+  /*
    * Iterates through all the HStores and finds the one with the largest
    * MapFile size. If the size is greater than the (currently hard-coded)
    * threshold, returns true indicating that the region should be split. The
    * midKey for the largest MapFile is returned through the midKey parameter.
-   * 
-   * @param midKey      - (return value) midKey of the largest MapFile
-   * @return            - true if the region should be split
+   * It is possible for us to rule the region non-splitable even in excess of
+   * configured size.  This happens if region contains a reference file.  If
+   * a reference file, the region can not be split.
+   * @param midKey midKey of the largest MapFile
+   * @return true if the region should be split. midKey is set by this method.
+   * Check it for a midKey value on return.
    */
   boolean needsSplit(Text midKey) {
     lock.obtainReadLock();
     try {
-      Text key = new Text();
-      long maxSize = 0;
-      long aggregateSize = 0;
-      for(HStore store: stores.values()) {
-        long size = store.getLargestFileSize(key);
-        aggregateSize += size;
-        if(size > maxSize) {                      // Largest so far
-          maxSize = size;
-          midKey.set(key);
-        }
-      }
+      HStore.HStoreSize biggest = largestHStore(midKey);
       long triggerSize =
         this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
-      boolean split = (maxSize >= triggerSize || aggregateSize >= triggerSize);
+      boolean split = (biggest.getAggregate() >= triggerSize);
       if (split) {
-        LOG.info("Splitting " + getRegionName().toString() +
-          " because largest file is " + StringUtils.humanReadableInt(maxSize) +
-          ", aggregate size is " +
-          StringUtils.humanReadableInt(aggregateSize) +
-          " and desired size is " +
-          StringUtils.humanReadableInt(this.desiredMaxFileSize));
+        if (!biggest.isSplitable()) {
+          LOG.warn("Region " + getRegionName().toString() +
+            " is NOT splitable though its aggregate size is " +
+            StringUtils.humanReadableInt(biggest.getAggregate()) +
+            " and desired size is " +
+            StringUtils.humanReadableInt(this.desiredMaxFileSize));
+          split = false;
+        } else {
+          LOG.info("Splitting " + getRegionName().toString() +
+            " because largest aggregate size is " +
+            StringUtils.humanReadableInt(biggest.getAggregate()) +
+            " and desired size is " +
+            StringUtils.humanReadableInt(this.desiredMaxFileSize));
+        }
       }
       return split;
     } finally {
       lock.releaseReadLock();
     }
   }
-
+  
   /**
-   * @return - returns the size of the largest HStore
-   */
-  long largestHStore() {
-    long maxsize = 0;
+   * @return returns size of largest HStore.  Also returns whether store is
+   * splitable or not (Its not splitable if region has a store that has a
+   * reference store file).
+   */
+  HStore.HStoreSize largestHStore(final Text midkey) {
+    HStore.HStoreSize biggest = null;
+    boolean splitable = true;
     lock.obtainReadLock();
     try {
-      Text key = new Text();
       for(HStore h: stores.values()) {
-        long size = h.getLargestFileSize(key);
-
-        if(size > maxsize) {                      // Largest so far
-          maxsize = size;
+        HStore.HStoreSize size = h.size(midkey);
+        // If we came across a reference down in the store, then propagate
+        // fact that region is not splitable.
+        if (splitable) {
+          splitable = size.splitable;
+        }
+        if (biggest == null) {
+          biggest = size;
+          continue;
+        }
+        if(size.getAggregate() > biggest.getAggregate()) { // Largest so far
+          biggest = size;
         }
       }
-      return maxsize;
-    
+      biggest.setSplitable(splitable);
+      return biggest;
     } finally {
       lock.releaseReadLock();
     }
@@ -891,7 +880,6 @@
    * not flush, returns list of all store files.
    */
   Vector<HStoreFile> internalFlushcache() throws IOException {
-
     long startTime = -1;
     if(LOG.isDebugEnabled()) {
       startTime = System.currentTimeMillis();
@@ -911,12 +899,14 @@
     // explicitly cleaned up using a call to deleteSnapshot().
     HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
     if(retval == null || retval.memcacheSnapshot == null) {
+      LOG.debug("Finished memcache flush; empty snapshot");
       return getAllStoreFiles();
     }
     long logCacheFlushId = retval.sequenceId;
     if(LOG.isDebugEnabled()) {
       LOG.debug("Snapshotted memcache for region " +
-        this.regionInfo.regionName + ". Sequence id " + retval.sequenceId);
+        this.regionInfo.regionName + " with sequence id " + retval.sequenceId +
+        " and entries " + retval.memcacheSnapshot.size());
     }
 
     // A.  Flush memcache to all the HStores.
@@ -1272,7 +1262,7 @@
             + lockid + " unexpected aborted by another thread");
       }
       
-      this.targetColumns.remove(lockid);
+      this.targetColumns.remove(Long.valueOf(lockid));
       releaseRowLock(row);
     }
   }
@@ -1387,7 +1377,7 @@
     // Pattern is that all access to rowsToLocks and/or to
     // locksToRows is via a lock on rowsToLocks.
     synchronized(rowsToLocks) {
-      return locksToRows.get(lockid);
+      return locksToRows.get(Long.valueOf(lockid));
     }
   }
   
@@ -1398,7 +1388,7 @@
   void releaseRowLock(Text row) {
     synchronized(rowsToLocks) {
       long lockid = rowsToLocks.remove(row).longValue();
-      locksToRows.remove(lockid);
+      locksToRows.remove(Long.valueOf(lockid));
       rowsToLocks.notifyAll();
     }
   }
@@ -1415,6 +1405,11 @@
     }
   }
   
+  @Override
+  public String toString() {
+    return getRegionName().toString();
+  }
+
   /**
    * HScanner is an iterator through a bunch of rows in an HRegion.
    */
@@ -1686,7 +1681,7 @@
   static HRegion createHRegion(final HRegionInfo info,
     final Path rootDir, final Configuration conf, final Path initialFiles)
   throws IOException {
-    Path regionDir = HStoreFile.getHRegionDir(rootDir, info.regionName);
+    Path regionDir = HRegion.getRegionDir(rootDir, info.regionName);
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(regionDir);
     return new HRegion(rootDir,
@@ -1721,19 +1716,23 @@
       final long startCode)
   throws IOException {
     HTable t = new HTable(conf, table);
-    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(bytes);
-    region.getRegionInfo().write(out);
-    long lockid = t.startUpdate(region.getRegionName());
-    t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
-    t.put(lockid, COL_SERVER,
-      serverAddress.toString().getBytes(UTF8_ENCODING));
-    t.put(lockid, COL_STARTCODE,
-      String.valueOf(startCode).getBytes(UTF8_ENCODING));
-    t.commit(lockid);
-    if (LOG.isDebugEnabled()) {
-      LOG.info("Added region " + region.getRegionName() + " to table " +
-        table);
+    try {
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      DataOutputStream out = new DataOutputStream(bytes);
+      region.getRegionInfo().write(out);
+      long lockid = t.startUpdate(region.getRegionName());
+      t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
+      t.put(lockid, COL_SERVER,
+        serverAddress.toString().getBytes(UTF8_ENCODING));
+      t.put(lockid, COL_STARTCODE,
+        String.valueOf(startCode).getBytes(UTF8_ENCODING));
+      t.commit(lockid);
+      if (LOG.isDebugEnabled()) {
+        LOG.info("Added region " + region.getRegionName() + " to table " +
+            table);
+      }
+    } finally {
+      t.close();
     }
   }
   
@@ -1748,31 +1747,126 @@
       final Text table, final Text regionName)
   throws IOException {
     HTable t = new HTable(conf, table);
-    long lockid = t.startUpdate(regionName);
+    try {
+      removeRegionFromMETA(t, regionName);
+    } finally {
+      t.close();
+    }
+  }
+  
+  /**
+   * Delete <code>region</code> from META <code>table</code>.
+   * @param conf Configuration object
+   * @param table META table we are to delete region from.
+   * @param regionName Region to remove.
+   * @throws IOException
+   */
+  static void removeRegionFromMETA(final HTable t, final Text regionName)
+  throws IOException {
+    long lockid = t.startBatchUpdate(regionName);
     t.delete(lockid, COL_REGIONINFO);
     t.delete(lockid, COL_SERVER);
     t.delete(lockid, COL_STARTCODE);
     t.commit(lockid);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Removed " + regionName + " from table " + table);
+      LOG.debug("Removed " + regionName + " from table " + t.getTableName());
+    }
+  }
+
+  /**
+   * Delete <code>split</code> column from META <code>table</code>.
+   * @param t
+   * @param split
+   * @param regionName Region to remove.
+   * @throws IOException
+   */
+  static void removeSplitFromMETA(final HTable t, final Text regionName,
+    final Text split)
+  throws IOException {
+    long lockid = t.startBatchUpdate(regionName);
+    t.delete(lockid, split);
+    t.commit(lockid);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removed " + split + " from " + regionName +
+        " from table " + t.getTableName());
+    }
+  }
+  
+  /**
+   * <code>region</code> has split.  Update META <code>table</code>.
+   * @param client Client to use running update.
+   * @param table META table we are to delete region from.
+   * @param regionName Region to remove.
+   * @throws IOException
+   */
+  static void writeSplitToMETA(final Configuration conf,
+      final Text table, final Text regionName, final HRegionInfo splitA,
+      final HRegionInfo splitB)
+  throws IOException {
+    HTable t = new HTable(conf, table);
+    try {
+      HRegionInfo hri = getRegionInfo(t.get(regionName, COL_REGIONINFO));
+      hri.offLine = true;
+      hri.split = true;
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(bytes);
+      hri.write(dos);
+      dos.close();
+      long lockid = t.startBatchUpdate(regionName);
+      t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
+      t.put(lockid, COL_SPLITA, Writables.getBytes(splitA));
+      t.put(lockid, COL_SPLITB, Writables.getBytes(splitB));
+      t.commitBatch(lockid);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Updated " + regionName + " in table " + table +
+        " on its being split");
+      }
+    } finally {
+      t.close();
     }
   }
   
   /**
+   * @param whichSplit COL_SPLITA or COL_SPLITB?
    * @param data Map of META row labelled column data.
-   * @return Server
+   * @return HRegionInfo or null if not found.
+   * @throws IOException 
+   */
+  static HRegionInfo getSplit(final TreeMap<Text, byte[]> data,
+      final Text whichSplit)
+  throws IOException {
+    if (!(whichSplit.equals(COL_SPLITA) || whichSplit.equals(COL_SPLITB))) {
+      throw new IOException("Illegal Argument: " + whichSplit);
+    }
+    byte []  bytes = data.get(whichSplit);
+    if (bytes == null || bytes.length == 0) {
+      return null;
+    }
+    return (HRegionInfo)((bytes == null || bytes.length == 0)?
+      null:
+      Writables.getWritable(bytes, new HRegionInfo()));
+  }
+  
+  /**
+   * @param data Map of META row labelled column data.
+   * @return An HRegionInfo instance.
+   * @throws IOException 
    */
   static HRegionInfo getRegionInfo(final TreeMap<Text, byte[]> data)
   throws IOException {
-    byte[] bytes = data.get(COL_REGIONINFO);
+    return getRegionInfo(data.get(COL_REGIONINFO));
+  }
+  
+  /**
+   * @param bytes Bytes of a HRegionInfo.
+   * @return An HRegionInfo instance.
+   * @throws IOException
+   */
+  static HRegionInfo getRegionInfo(final byte[] bytes) throws IOException {
     if (bytes == null || bytes.length == 0) {
       throw new IOException("no value for " + COL_REGIONINFO);
     }
-    DataInputBuffer in = new DataInputBuffer();
-    in.reset(bytes, bytes.length);
-    HRegionInfo info = new HRegionInfo();
-    info.readFields(in);
-    return info;
+    return (HRegionInfo)Writables.getWritable(bytes, new HRegionInfo());
   }
   
   /**
@@ -1810,4 +1904,52 @@
     }
     return startCode;
   }
-}
+
+  public static Path getRegionDir(final Path dir, final Text regionName) {
+    return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName));
+  }
+  
+
+  /**
+   * Deletes all the files for a HRegion
+   * 
+   * @param fs the file system object
+   * @param baseDirectory base directory for HBase
+   * @param regionName name of the region to delete
+   * @throws IOException
+   * @return True if deleted.
+   */
+  static boolean deleteRegion(FileSystem fs, Path baseDirectory,
+      Text regionName) throws IOException {
+    Path p = HRegion.getRegionDir(fs.makeQualified(baseDirectory), regionName);
+    return fs.delete(p);
+  }
+ 
+  /**
+   * Look for HStoreFile references in passed region.
+   * @param fs
+   * @param baseDirectory
+   * @param hri
+   * @return True if we found references.
+   * @throws IOException 
+   */
+  static boolean hasReferences(final FileSystem fs, final Path baseDirectory,
+      final HRegionInfo hri)
+  throws IOException {
+    boolean result = false;
+    for (Text family: hri.getTableDesc().families().keySet()) {
+      Path p = HStoreFile.getMapDir(baseDirectory, hri.getRegionName(),
+        HStoreKey.extractFamily(family));
+      // Look for reference files.
+      Path [] ps = fs.listPaths(p, new PathFilter () {
+        public boolean accept(Path path) {
+          return HStoreFile.isReference(path);
+        }});
+      if (ps != null && ps.length > 0) {
+        result = true;
+        break;
+      }
+    }
+    return result;
+  }
+}
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java?view=diff&rev=564012&r1=564011&r2=564012
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java Wed Aug  8 13:30:13 2007
@@ -66,6 +66,7 @@
   Text startKey;
   Text endKey;
   boolean offLine;
+  boolean split;
   HTableDescriptor tableDesc;
   
   /** Default constructor - creates empty object */
@@ -76,6 +77,7 @@
     this.endKey = new Text();
     this.regionName = new Text();
     this.offLine = false;
+    this.split = false;
   }
   
   /**
@@ -88,6 +90,21 @@
     this();
     readFields(new DataInputStream(new ByteArrayInputStream(serializedBytes)));
   }
+  
+  /**
+   * Construct HRegionInfo with explicit parameters
+   * 
+   * @param regionId the region id
+   * @param tableDesc the table descriptor
+   * @param startKey first key in region
+   * @param endKey end of key range
+   * @throws IllegalArgumentException
+   */
+  public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey,
+      Text endKey)
+  throws IllegalArgumentException {
+    this(regionId, tableDesc, startKey, endKey, false);
+  }
 
   /**
    * Construct HRegionInfo with explicit parameters
@@ -96,10 +113,13 @@
    * @param tableDesc the table descriptor
    * @param startKey first key in region
    * @param endKey end of key range
+   * @param split true if this region has split and we have daughter regions
+   * regions that may or may not hold references to this region.
    * @throws IllegalArgumentException
    */
   public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey,
-      Text endKey) throws IllegalArgumentException {
+      Text endKey, final boolean split)
+  throws IllegalArgumentException {
     
     this.regionId = regionId;
     
@@ -124,6 +144,7 @@
       regionId);
     
     this.offLine = false;
+    this.split = split;
   }
   
   /** @return the endKey */
@@ -150,6 +171,20 @@
   public HTableDescriptor getTableDesc(){
     return tableDesc;
   }
+  
+  /**
+   * @return True if has been split and has daughters.
+   */
+  public boolean isSplit() {
+    return this.split;
+  }
+  
+  /**
+   * @return True if this region is offline.
+   */
+  public boolean isOffline() {
+    return this.offLine;
+  }
 
   /**
    * {@inheritDoc}
@@ -157,8 +192,10 @@
   @Override
   public String toString() {
     return "regionname: " + this.regionName.toString() + ", startKey: <" +
-      this.startKey.toString() + ">, tableDesc: {" +
-      this.tableDesc.toString() + "}";
+      this.startKey.toString() + ">," +
+      (isOffline()? " offline: true,": "") +
+      (isSplit()? " split: true,": "") +
+      " tableDesc: {" + this.tableDesc.toString() + "}";
   }
     
   /**
@@ -197,6 +234,7 @@
     endKey.write(out);
     regionName.write(out);
     out.writeBoolean(offLine);
+    out.writeBoolean(split);
   }
   
   /**
@@ -209,6 +247,7 @@
     this.endKey.readFields(in);
     this.regionName.readFields(in);
     this.offLine = in.readBoolean();
+    this.split = in.readBoolean();
   }
   
   //

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java?view=diff&rev=564012&r1=564011&r2=564012
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionLocation.java Wed Aug  8 13:30:13 2007
@@ -91,4 +91,4 @@
     }
     return result;
   }
-}
+}
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=564012&r1=564011&r2=564012
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Wed Aug  8 13:30:13 2007
@@ -113,7 +113,7 @@
         // regions.
         retiringRegions.put(regionName, onlineRegions.remove(regionName));
         if (LOG.isDebugEnabled()) {
-          LOG.debug(regionName.toString() + "closing (" +
+          LOG.debug(regionName.toString() + " closing (" +
             "Adding to retiringRegions)");
         }
       } finally {
@@ -199,15 +199,16 @@
       // splitting a 'normal' region, and the ROOT table needs to be
       // updated if we are splitting a META region.
       final Text tableToUpdate =
-        region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME) ?
-            ROOT_TABLE_NAME : META_TABLE_NAME;
+        region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME)?
+          ROOT_TABLE_NAME : META_TABLE_NAME;
       LOG.info("Updating " + tableToUpdate + " with region split info");
 
       // Remove old region from META
       for (int tries = 0; tries < numRetries; tries++) {
         try {
-          HRegion.removeRegionFromMETA(conf, tableToUpdate,
-              region.getRegionName());
+          HRegion.writeSplitToMETA(conf, tableToUpdate,
+            region.getRegionName(), newRegions[0].getRegionInfo(),
+            newRegions[1].getRegionInfo());
           break;
         } catch (IOException e) {
           if(tries == numRetries - 1) {
@@ -242,18 +243,17 @@
         LOG.debug("Reporting region split to master");
       }
       reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
-          newRegions[1].getRegionInfo());
+        newRegions[1].getRegionInfo());
       LOG.info("region split, META update, and report to master all" +
-          " successful. Old region=" + oldRegionInfo.getRegionName() +
-          ", new regions: " + newRegions[0].getRegionName() + ", " +
-          newRegions[1].getRegionName());
+        " successful. Old region=" + oldRegionInfo.getRegionName() +
+        ", new regions: " + newRegions[0].getRegionName() + ", " +
+        newRegions[1].getRegionName());
       
       // Finally, start serving the new regions
       lock.writeLock().lock();
       try {
         onlineRegions.put(newRegions[0].getRegionName(), newRegions[0]);
         onlineRegions.put(newRegions[1].getRegionName(), newRegions[1]);
-        
       } finally {
         lock.writeLock().unlock();
       }
@@ -307,7 +307,7 @@
                   iex = x;
                 }
               }
-              LOG.error(iex);
+              LOG.error("", iex);
             }
           }
         }
@@ -347,10 +347,10 @@
      * {@inheritDoc}
      */
     public void run() {
-      while(! stopRequested) {
+      while(!stopRequested) {
         synchronized(logRollerLock) {
-          // If the number of log entries is high enough, roll the log.  This is a
-          // very fast operation, but should not be done too frequently.
+          // If the number of log entries is high enough, roll the log.  This
+          // is a very fast operation, but should not be done too frequently.
           int nEntries = log.getNumEntries();
           if(nEntries > this.maxLogEntries) {
             try {
@@ -359,17 +359,16 @@
             } catch (IOException iex) {
               if (iex instanceof RemoteException) {
                 try {
-                  iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex);
-                  
+                  iex = RemoteExceptionHandler.
+                    decodeRemoteException((RemoteException) iex);
                 } catch (IOException x) {
                   iex = x;
                 }
               }
-              LOG.warn(iex);
+              LOG.warn("", iex);
             }
           }
         }
-        
         if(!stopRequested) {
           try {
             Thread.sleep(threadWakeFrequency);
@@ -586,7 +585,7 @@
           e = ex;
         }
       }
-      LOG.error(e);
+      LOG.error("", e);
     }
 
     while(! stopRequested) {
@@ -653,9 +652,6 @@
                 break;
 
               default:
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Got default message");
-                }
                 try {
                   toDo.put(new ToDoEntry(msgs[i]));
                 } catch (InterruptedException e) {
@@ -678,7 +674,7 @@
                 e = ex;
               }
             }
-            LOG.error(e);
+            LOG.error("", e);
           }
         }
 
@@ -716,16 +712,16 @@
     if (abortRequested) {
       try {
         log.close();
+        LOG.info("On abort, closed hlog");
       } catch (IOException e) {
         if (e instanceof RemoteException) {
           try {
             e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-            
           } catch (IOException ex) {
             e = ex;
           }
         }
-        LOG.warn(e);
+        LOG.warn("Abort close of log", e);
       }
       closeAllRegions(); // Don't leave any open file handles
       LOG.info("aborting server at: " +
@@ -743,7 +739,7 @@
             e = ex;
           }
         }
-        LOG.error(e);
+        LOG.error("", e);
       }
       try {
         HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
@@ -767,7 +763,7 @@
             e = ex;
           }
         }
-        LOG.warn(e);
+        LOG.warn("", e);
       }
       LOG.info("stopping server at: " +
         serverInfo.getServerAddress().toString());
@@ -1113,7 +1109,7 @@
             iex = x;
           }
         }
-        LOG.error(iex);
+        LOG.error("", iex);
       }
     }
   }
@@ -1267,12 +1263,11 @@
       if (e instanceof RemoteException) {
         try {
           e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-          
         } catch (IOException x) {
           e = x;
         }
       }
-      LOG.error(e);
+      LOG.error("", e);
       throw e;
     }
     return scannerId;

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=564012&r1=564011&r2=564012
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Wed Aug  8 13:30:13 2007
@@ -33,21 +33,21 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.StringUtils;
-
-import org.onelab.filter.*;
+import org.onelab.filter.BloomFilter;
+import org.onelab.filter.CountingBloomFilter;
+import org.onelab.filter.Filter;
+import org.onelab.filter.RetouchedBloomFilter;
 
 /**
  * HStore maintains a bunch of data files.  It is responsible for maintaining 
@@ -86,8 +86,8 @@
 
   final HLocking lock = new HLocking();
 
-  TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
-  TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
+  TreeMap<Long, HStoreFile> storefiles = new TreeMap<Long, HStoreFile>();
+  TreeMap<Long, MapFile.Reader> readers = new TreeMap<Long, MapFile.Reader>();
 
   Random rand = new Random();
 
@@ -137,10 +137,9 @@
     if(family.getCompression() != HColumnDescriptor.CompressionType.NONE) {
       if(family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
         this.compression = SequenceFile.CompressionType.BLOCK;
-        
-      } else if(family.getCompression() == HColumnDescriptor.CompressionType.RECORD) {
+      } else if(family.getCompression() ==
+          HColumnDescriptor.CompressionType.RECORD) {
         this.compression = SequenceFile.CompressionType.RECORD;
-        
       } else {
         assert(false);
       }
@@ -148,16 +147,13 @@
     
     this.fs = fs;
     this.conf = conf;
-
     this.mapdir = HStoreFile.getMapDir(dir, regionName, familyName);
     fs.mkdirs(mapdir);
     this.loginfodir = HStoreFile.getInfoDir(dir, regionName, familyName);
     fs.mkdirs(loginfodir);
-    
     if(family.bloomFilter == null) {
       this.filterDir = null;
       this.bloomFilter = null;
-      
     } else {
       this.filterDir = HStoreFile.getFilterDir(dir, regionName, familyName);
       fs.mkdirs(filterDir);
@@ -165,13 +161,15 @@
     }
 
     if(LOG.isDebugEnabled()) {
-      LOG.debug("Starting HStore for " + this.storeName);
+      LOG.debug("starting " + this.storeName +
+        ((reconstructionLog == null)?
+          " (no reconstruction log)": " with reconstruction log: " +
+          reconstructionLog.toString()));
     }
     
     // Either restart or get rid of any leftover compaction work.  Either way, 
     // by the time processReadyCompaction() returns, we can get rid of the 
     // existing compaction-dir.
-
     this.compactdir = new Path(dir, COMPACTION_DIR);
     Path curCompactStore =
       HStoreFile.getHStoreDir(compactdir, regionName, familyName);
@@ -187,7 +185,7 @@
     Vector<HStoreFile> hstoreFiles 
       = HStoreFile.loadHStoreFiles(conf, dir, regionName, familyName, fs);
     for(HStoreFile hsf: hstoreFiles) {
-      mapFiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
+      this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
     }
 
     // Now go through all the HSTORE_LOGINFOFILEs and figure out the
@@ -209,21 +207,24 @@
     }
 
     doReconstructionLog(reconstructionLog, maxSeqID);
-    
+
     // Compact all the MapFiles into a single file.  The resulting MapFile 
     // should be "timeless"; that is, it should not have an associated seq-ID, 
     // because all log messages have been reflected in the TreeMaps at this
-    // point.
-    if(mapFiles.size() >= 1) {
+    // point.  
+    //
+    // TODO: Only do the compaction if we are over a threshold, not
+    // every time. Not necessary if only two or three store files.  Fix after
+    // revamp of compaction.
+    if(storefiles.size() > 1) {
       compactHelper(true);
     }
-
+    
     // Finally, start up all the map readers! (There should be just one at this 
     // point, as we've compacted them all.)
-    for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
-      // TODO - is this really necessary?  Don't I do this inside compact()?
-      maps.put(e.getKey(),
-        getMapFileReader(e.getValue().getMapFilePath().toString()));
+    for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
+      this.readers.put(e.getKey(),
+        e.getValue().getReader(this.fs, this.bloomFilter));
     }
   }
   
@@ -239,6 +240,11 @@
       final long maxSeqID)
   throws UnsupportedEncodingException, IOException {
     if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
+      if (reconstructionLog != null && !fs.exists(reconstructionLog)) {
+        LOG.warn("Passed reconstruction log " + reconstructionLog +
+          " does not exist");
+      }
+      // Nothing to do.
       return;
     }
     long maxSeqIdInLog = -1;
@@ -271,8 +277,7 @@
         }
         HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Applying edit " + k.toString() + "=" +
-            new String(val.getVal(), UTF8_ENCODING));
+          LOG.debug("Applying edit " + k.toString());
         }
         reconstructedCache.put(k, val.getVal());
       }
@@ -364,144 +369,24 @@
       LOG.debug("flushed bloom filter for " + this.storeName);
     }
   }
-
-  /** Generates a bloom filter key from the row and column keys */
-  Key getBloomFilterKey(HStoreKey k) {
-    StringBuilder s = new StringBuilder(k.getRow().toString());
-    s.append(k.getColumn().toString());
-    
-    byte[] bytes = null;
-    try {
-      bytes = s.toString().getBytes(HConstants.UTF8_ENCODING);
-      
-    } catch (UnsupportedEncodingException e) {
-      e.printStackTrace();
-      assert(false);
-    }
-    return new Key(bytes);
-  }
-
-  /** 
-   * Extends MapFile.Reader and overrides get and getClosest to consult the
-   * bloom filter before attempting to read from disk.
-   */
-  private class BloomFilterReader extends MapFile.Reader {
-    
-    BloomFilterReader(FileSystem fs, String dirName, Configuration conf)
-    throws IOException {
-      super(fs, dirName, conf);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Writable get(WritableComparable key, Writable val) throws IOException {
-      // Note - the key being passed to us is always a HStoreKey
-      
-      if(bloomFilter.membershipTest(getBloomFilterKey((HStoreKey)key))) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("bloom filter reported that key exists");
-        }
-        return super.get(key, val);
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("bloom filter reported that key does not exist");
-      }
-      return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public WritableComparable getClosest(WritableComparable key, Writable val)
-    throws IOException {
-      // Note - the key being passed to us is always a HStoreKey
-      
-      if(bloomFilter.membershipTest(getBloomFilterKey((HStoreKey)key))) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("bloom filter reported that key exists");
-        }
-        return super.getClosest(key, val);
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("bloom filter reported that key does not exist");
-      }
-      return null;
-    }
-  }
-
-  /**
-   * Extends MapFile.Writer and overrides append, so that whenever a MapFile
-   * is written to, the key is added to the bloom filter.
-   */
-  private class BloomFilterWriter extends MapFile.Writer {
-
-    @SuppressWarnings("unchecked")
-    BloomFilterWriter(Configuration conf, FileSystem fs, String dirName,
-        Class keyClass, Class valClass, SequenceFile.CompressionType compression)
-    throws IOException {
-      super(conf, fs, dirName, keyClass, valClass, compression);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void append(WritableComparable key, Writable val) throws IOException {
-      // Note - the key being passed to us is always a HStoreKey
-
-      bloomFilter.add(getBloomFilterKey((HStoreKey)key));
-      super.append(key, val);
-    }
-  }
-  
-  /**
-   * Get a MapFile reader
-   * This allows us to substitute a BloomFilterReader if a bloom filter is enabled
-   */
-  MapFile.Reader getMapFileReader(String dirName) throws IOException {
-    if(bloomFilter != null) {
-      return new BloomFilterReader(fs, dirName, conf);
-    }
-    return new MapFile.Reader(fs, dirName, conf);
-  }
-  
-  /**
-   * Get a MapFile writer
-   * This allows us to substitute a BloomFilterWriter if a bloom filter is
-   * enabled
-   * 
-   * @param dirName Directory with store files.
-   * @return Map file.
-   * @throws IOException
-   */
-  MapFile.Writer getMapFileWriter(String dirName) throws IOException {
-    if (bloomFilter != null) {
-      return new BloomFilterWriter(conf, fs, dirName, HStoreKey.class,
-        ImmutableBytesWritable.class, compression);
-    }
-    return new MapFile.Writer(conf, fs, dirName, HStoreKey.class,
-        ImmutableBytesWritable.class, compression);
-  }
   
   //////////////////////////////////////////////////////////////////////////////
   // End bloom filters
   //////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Turn off all the MapFile readers
-   * 
+   * Close all the MapFile readers
    * @throws IOException
    */
   void close() throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.info("closing HStore for " + this.storeName);
-    }
     this.lock.obtainWriteLock();
     try {
-      for (MapFile.Reader map: maps.values()) {
-        map.close();
+      for (MapFile.Reader reader: this.readers.values()) {
+        reader.close();
       }
-      maps.clear();
-      mapFiles.clear();
-      
-      LOG.info("HStore closed for " + this.storeName);
+      this.readers.clear();
+      this.storefiles.clear();
+      LOG.info("closed " + this.storeName);
     } finally {
       this.lock.releaseWriteLock();
     }
@@ -540,17 +425,19 @@
       // A. Write the TreeMap out to the disk
       HStoreFile flushedFile = HStoreFile.obtainNewHStoreFile(conf, dir,
         regionName, familyName, fs);
-      Path mapfile = flushedFile.getMapFilePath();
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Flushing to " + mapfile.toString());
-      }
-      MapFile.Writer out = getMapFileWriter(mapfile.toString());
+      String name = flushedFile.toString();
+      MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
+        this.bloomFilter);
+      int count = 0;
+      int total = 0;
       try {
         for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
           HStoreKey curkey = es.getKey();
+          total++;
           if (this.familyName.
               equals(HStoreKey.extractFamily(curkey.getColumn()))) {
             out.append(curkey, new ImmutableBytesWritable(es.getValue()));
+            count++;
           }
         }
       } finally {
@@ -571,13 +458,14 @@
         this.lock.obtainWriteLock();
         try {
           Long flushid = Long.valueOf(logCacheFlushId);
-          maps.put(flushid, getMapFileReader(mapfile.toString()));
-          mapFiles.put(flushid, flushedFile);
+          // Open the map file reader.
+          this.readers.put(flushid,
+            flushedFile.getReader(this.fs, this.bloomFilter));
+          this.storefiles.put(flushid, flushedFile);
           if(LOG.isDebugEnabled()) {
-            LOG.debug("Added " + mapfile.toString() +
-                " with flush id " + logCacheFlushId + " and size " +
-              StringUtils.humanReadableInt(mapfile.getFileSystem(this.conf).
-                getContentLength(mapfile)));
+            LOG.debug("Added " + name +
+                " with sequence id " + logCacheFlushId + " and size " +
+              StringUtils.humanReadableInt(flushedFile.length()));
           }
         } finally {
           this.lock.releaseWriteLock();
@@ -593,8 +481,7 @@
   Vector<HStoreFile> getAllMapFiles() {
     this.lock.obtainReadLock();
     try {
-      return new Vector<HStoreFile>(mapFiles.values());
-      
+      return new Vector<HStoreFile>(storefiles.values());
     } finally {
       this.lock.releaseReadLock();
     }
@@ -632,7 +519,7 @@
         HStoreFile.getHStoreDir(compactdir, regionName, familyName);
       fs.mkdirs(curCompactStore);
       if(LOG.isDebugEnabled()) {
-        LOG.debug("started compaction of " + mapFiles.size() + " files in " +
+        LOG.debug("started compaction of " + storefiles.size() + " files in " +
           curCompactStore.toString());
       }
       try {
@@ -640,7 +527,7 @@
         Vector<HStoreFile> toCompactFiles = null;
         this.lock.obtainWriteLock();
         try {
-          toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
+          toCompactFiles = new Vector<HStoreFile>(storefiles.values());
         } finally {
           this.lock.releaseWriteLock();
         }
@@ -660,6 +547,7 @@
         HStoreFile compactedOutputFile 
           = new HStoreFile(conf, compactdir, regionName, familyName, -1);
         if(toCompactFiles.size() == 1) {
+          // TODO: Only rewrite if NOT a HSF reference file.
           if(LOG.isDebugEnabled()) {
             LOG.debug("nothing to compact for " + this.storeName);
           }
@@ -671,7 +559,8 @@
 
         // Step through them, writing to the brand-new TreeMap
         MapFile.Writer compactedOut =
-          getMapFileWriter(compactedOutputFile.getMapFilePath().toString());
+          compactedOutputFile.getWriter(this.fs, this.compression,
+            this.bloomFilter);
         try {
           // We create a new set of MapFile.Reader objects so we don't screw up 
           // the caching associated with the currently-loaded ones.
@@ -684,14 +573,14 @@
           // lowest-ranked one.  Updates to a single row/column will appear 
           // ranked by timestamp.  This allows us to throw out deleted values or
           // obsolete versions.
-          MapFile.Reader[] readers = new MapFile.Reader[toCompactFiles.size()];
+          MapFile.Reader[] rdrs = new MapFile.Reader[toCompactFiles.size()];
           HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
           ImmutableBytesWritable[] vals =
             new ImmutableBytesWritable[toCompactFiles.size()];
           boolean[] done = new boolean[toCompactFiles.size()];
           int pos = 0;
           for(HStoreFile hsf: toCompactFiles) {
-            readers[pos] = getMapFileReader(hsf.getMapFilePath().toString());
+            rdrs[pos] = hsf.getReader(this.fs, this.bloomFilter);
             keys[pos] = new HStoreKey();
             vals[pos] = new ImmutableBytesWritable();
             done[pos] = false;
@@ -701,9 +590,9 @@
           // Now, advance through the readers in order.  This will have the
           // effect of a run-time sort of the entire dataset.
           int numDone = 0;
-          for(int i = 0; i < readers.length; i++) {
-            readers[i].reset();
-            done[i] = ! readers[i].next(keys[i], vals[i]);
+          for(int i = 0; i < rdrs.length; i++) {
+            rdrs[i].reset();
+            done[i] = ! rdrs[i].next(keys[i], vals[i]);
             if(done[i]) {
               numDone++;
             }
@@ -715,7 +604,7 @@
           while(numDone < done.length) {
             // Find the reader with the smallest key
             int smallestKey = -1;
-            for(int i = 0; i < readers.length; i++) {
+            for(int i = 0; i < rdrs.length; i++) {
               if(done[i]) {
                 continue;
               }
@@ -760,10 +649,10 @@
 
             // Advance the smallest key.  If that reader's all finished, then 
             // mark it as done.
-            if(! readers[smallestKey].next(keys[smallestKey],
+            if(! rdrs[smallestKey].next(keys[smallestKey],
                 vals[smallestKey])) {
               done[smallestKey] = true;
-              readers[smallestKey].close();
+              rdrs[smallestKey].close();
               numDone++;
             }
           }
@@ -772,8 +661,7 @@
         }
 
         if(LOG.isDebugEnabled()) {
-          LOG.debug("writing new compacted HStore to " +
-            compactedOutputFile.getMapFilePath().toString());
+          LOG.debug("writing new compacted HStore " + compactedOutputFile);
         }
 
         // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
@@ -818,7 +706,6 @@
    * invoked at HStore startup, if the prior execution died midway through.
    */
   void processReadyCompaction() throws IOException {
-
     // Move the compacted TreeMap into place.
     // That means:
     // 1) Acquiring the write-lock
@@ -830,29 +717,20 @@
     // 7) Releasing the write-lock
 
     // 1. Acquiring the write-lock
-
-
     Path curCompactStore =
       HStoreFile.getHStoreDir(compactdir, regionName, familyName);
     this.lock.obtainWriteLock();
     try {
       Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
       if(!fs.exists(doneFile)) {
-        
         // The last execution didn't finish the compaction, so there's nothing 
         // we can do.  We'll just have to redo it. Abandon it and return.
         LOG.warn("Redoing a failed compaction");
         return;
       }
 
-      // OK, there's actually compaction work that needs to be put into place.
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Process ready compaction starting");
-      }
-      
       // 2. Load in the files to be deleted.
       //    (Figuring out what MapFiles are going to be replaced)
-      
       Vector<HStoreFile> toCompactFiles = new Vector<HStoreFile>();
       Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
       DataInputStream in = new DataInputStream(fs.open(filesToReplace));
@@ -868,41 +746,29 @@
         in.close();
       }
 
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("loaded " + toCompactFiles.size() +
-          " file(s) to be deleted");
-      }
-      
-      // 3. Unload all the replaced MapFiles.
-      Iterator<HStoreFile> it2 = mapFiles.values().iterator();
-      for(Iterator<MapFile.Reader> it = maps.values().iterator();
-          it.hasNext(); ) {
-        MapFile.Reader curReader = it.next();
-        HStoreFile curMapFile = it2.next();
-        if(toCompactFiles.contains(curMapFile)) {
-          curReader.close();
-          it.remove();
-        }
-      }
-      
-      for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
-        HStoreFile curMapFile = it.next();
-        if(toCompactFiles.contains(curMapFile)) {
-          it.remove();
-        }
-      }
+      // 3. Unload all the replaced MapFiles.  Do it by getting keys of all
+      // to remove.  Then cycling on keys, removing, closing and deleting.
       
       // What if we crash at this point?  No big deal; we will restart
       // processReadyCompaction(), and nothing has been lost.
-
-      // 4. Delete all the old files, no longer needed
-      for(HStoreFile hsf: toCompactFiles) {
-        fs.delete(hsf.getMapFilePath());
-        fs.delete(hsf.getInfoFilePath());
+      Vector<Long> keys = new Vector<Long>(toCompactFiles.size());
+      for(Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
+        if(toCompactFiles.contains(e.getValue())) {
+          keys.add(e.getKey());
+        }
       }
 
+      for (Long key: keys) {
+        MapFile.Reader reader = this.readers.remove(key);
+        if (reader != null) {
+          reader.close();
+        }
+        HStoreFile hsf = this.storefiles.remove(key);
+        // 4. Delete all old files, no longer needed
+        hsf.delete();
+      }
       if(LOG.isDebugEnabled()) {
-        LOG.debug("old file(s) deleted");
+        LOG.debug("deleted " + toCompactFiles.size() + " old file(s)");
       }
       
       // What if we fail now?  The above deletes will fail silently. We'd better
@@ -915,24 +781,19 @@
       HStoreFile finalCompactedFile 
         = HStoreFile.obtainNewHStoreFile(conf, dir, regionName, familyName, fs);
       if(LOG.isDebugEnabled()) {
-        LOG.debug("moving " + compactedFile.getMapFilePath().toString() +
-          " to " + finalCompactedFile.getMapFilePath().toString());
+        LOG.debug("moving " + compactedFile.toString() + " in " +
+          compactdir.toString() +
+          " to " + finalCompactedFile.toString() + " in " + dir.toString());
       }
-      
-      fs.rename(compactedFile.getMapFilePath(),
-        finalCompactedFile.getMapFilePath());
-      
-      // Fail here?  No problem.
-      fs.rename(compactedFile.getInfoFilePath(),
-        finalCompactedFile.getInfoFilePath());
+      compactedFile.rename(this.fs, finalCompactedFile);
 
       // Fail here?  No worries.
       Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
 
       // 6. Loading the new TreeMap.
-      mapFiles.put(orderVal, finalCompactedFile);
-      maps.put(orderVal, getMapFileReader(
-        finalCompactedFile.getMapFilePath().toString()));
+      this.readers.put(orderVal,
+        finalCompactedFile.getReader(this.fs, this.bloomFilter));
+      this.storefiles.put(orderVal, finalCompactedFile);
     } finally {
       
       // 7. Releasing the write-lock
@@ -955,8 +816,7 @@
   throws IOException {
     this.lock.obtainReadLock();
     try {
-      MapFile.Reader[] maparray 
-        = maps.values().toArray(new MapFile.Reader[maps.size()]);
+      MapFile.Reader[] maparray = getReaders();
       for (int i = maparray.length - 1; i >= 0; i--) {
         MapFile.Reader map = maparray[i];
         synchronized(map) {
@@ -984,6 +844,11 @@
       this.lock.releaseReadLock();
     }
   }
+  
+  private MapFile.Reader [] getReaders() {
+    return this.readers.values().
+      toArray(new MapFile.Reader[this.readers.size()]);
+  }
 
   /**
    * Get the value for the indicated HStoreKey.  Grab the target value and the 
@@ -999,10 +864,8 @@
     List<byte []> results = new ArrayList<byte []>();
     this.lock.obtainReadLock();
     try {
-      MapFile.Reader[] maparray 
-        = maps.values().toArray(new MapFile.Reader[maps.size()]);
-      
-      for(int i = maparray.length-1; i >= 0; i--) {
+      MapFile.Reader[] maparray = getReaders();
+      for(int i = maparray.length - 1; i >= 0; i--) {
         MapFile.Reader map = maparray[i];
 
         synchronized(map) {
@@ -1044,40 +907,83 @@
     }
   }
   
+  /*
+   * Data structure to hold result of a look at store file sizes.
+   */
+  class HStoreSize {
+    final long aggregate;
+    final long largest;
+    boolean splitable;
+    
+    HStoreSize(final long a, final long l, final boolean s) {
+      this.aggregate = a;
+      this.largest = l;
+      this.splitable = s;
+    }
+    
+    long getAggregate() {
+      return this.aggregate;
+    }
+    
+    long getLargest() {
+      return this.largest;
+    }
+    
+    boolean isSplitable() {
+      return this.splitable;
+    }
+    
+    void setSplitable(final boolean s) {
+      this.splitable = s;
+    }
+  }
+  
   /**
-   * Gets the size of the largest MapFile and its mid key.
+   * Gets size for the store.
    * 
-   * @param midKey      - the middle key for the largest MapFile
-   * @return            - size of the largest MapFile
+   * @param midKey Gets set to the middle key of the largest splitable store
+   * file or its set to empty if largest is not splitable.
+   * @return Sizes for the store and the passed <code>midKey</code> is
+   * set to midKey of largest splitable.  Otherwise, its set to empty
+   * to indicate we couldn't find a midkey to split on
    */
-  long getLargestFileSize(Text midKey) {
+  HStoreSize size(Text midKey) {
     long maxSize = 0L;
-    if (this.mapFiles.size() <= 0) {
-      return maxSize;
+    long aggregateSize = 0L;
+    // Not splitable if we find a reference store file present in the store.
+    boolean splitable = true;
+    if (this.storefiles.size() <= 0) {
+      return new HStoreSize(0, 0, splitable);
     }
     
     this.lock.obtainReadLock();
     try {
       Long mapIndex = Long.valueOf(0L);
       // Iterate through all the MapFiles
-      for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
+      for(Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
         HStoreFile curHSF = e.getValue();
-        long size = fs.getFileStatus(
-          new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME)).getLen();
-        if(size > maxSize) {              // This is the largest one so far
+        long size = curHSF.length();
+        aggregateSize += size;
+        if (maxSize == 0L || size > maxSize) {
+          // This is the largest one so far
           maxSize = size;
           mapIndex = e.getKey();
         }
+        if (splitable) {
+          splitable = !curHSF.isReference();
+        }
+      }
+      MapFile.Reader r = this.readers.get(mapIndex);
+      WritableComparable midkey = r.midKey();
+      if (midkey != null) {
+        midKey.set(((HStoreKey)midkey).getRow());
       }
-
-      MapFile.Reader r = maps.get(mapIndex);
-      midKey.set(((HStoreKey)r.midKey()).getRow());
     } catch(IOException e) {
-      LOG.warn(e);
+      LOG.warn("", e);
     } finally {
       this.lock.releaseReadLock();
     }
-    return maxSize;
+    return new HStoreSize(aggregateSize, maxSize, splitable);
   }
   
   /**
@@ -1086,7 +992,7 @@
   int getNMaps() {
     this.lock.obtainReadLock();
     try {
-      return maps.size();
+      return storefiles.size();
       
     } finally {
       this.lock.releaseReadLock();
@@ -1127,20 +1033,17 @@
     private MapFile.Reader[] readers;
     
     HStoreScanner(long timestamp, Text[] targetCols, Text firstRow)
-        throws IOException {
-      
+    throws IOException {
       super(timestamp, targetCols);
-
       lock.obtainReadLock();
       try {
-        this.readers = new MapFile.Reader[mapFiles.size()];
+        this.readers = new MapFile.Reader[storefiles.size()];
         
         // Most recent map file should be first
         
         int i = readers.length - 1;
-        for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
-          HStoreFile curHSF = it.next();
-          readers[i--] = getMapFileReader(curHSF.getMapFilePath().toString());
+        for(HStoreFile curHSF: storefiles.values()) {
+          readers[i--] = curHSF.getReader(fs, bloomFilter);
         }
         
         this.keys = new HStoreKey[readers.length];
@@ -1164,7 +1067,7 @@
         }
         
       } catch (Exception ex) {
-        LOG.error(ex);
+        LOG.error("Failed construction", ex);
         close();
       }
     }
@@ -1218,9 +1121,8 @@
         if(readers[i] != null) {
           try {
             readers[i].close();
-            
           } catch(IOException e) {
-            LOG.error(e);
+            LOG.error("Sub-scanner close", e);
           }
         }
         
@@ -1240,9 +1142,8 @@
             if(readers[i] != null) {
               try {
                 readers[i].close();
-                
               } catch(IOException e) {
-                LOG.error(e);
+                LOG.error("Scanner close", e);
               }
             }
           }



Mime
View raw message