hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r525267 [3/5] - in /lucene/hadoop/trunk: ./ src/contrib/hbase/ src/contrib/hbase/src/ src/contrib/hbase/src/java/ src/contrib/hbase/src/java/org/ src/contrib/hbase/src/java/org/apache/ src/contrib/hbase/src/java/org/apache/hadoop/ src/contr...
Date Tue, 03 Apr 2007 20:34:30 GMT
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,1240 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * HRegion stores data for a certain region of a table.  It stores all columns 
+ * for each row. A given table consists of one or more HRegions.
+ *
+ * We maintain multiple HStores for a single HRegion.
+ * 
+ * An HStore is a set of rows with some column data; together, they make up all 
+ * the data for the rows.  
+ *
+ * Each HRegion has a 'startKey' and 'endKey'.
+ *   
+ * The first is inclusive, the second is exclusive (except for the final region)
+ * The endKey of region 0 is the same as startKey for region 1 (if it exists).  
+ * The startKey for the first region is null.
+ * The endKey for the final region is null.
+ *
+ * The HStores have no locking built-in.  All row-level locking and row-level 
+ * atomicity is provided by the HRegion.
+ ******************************************************************************/
+public class HRegion implements HConstants {
+  static String SPLITDIR = "splits";
+  static String MERGEDIR = "merges";
+  static String TMPREGION_PREFIX = "tmpregion_";
+  static int MIN_COMMITS_FOR_COMPACTION = 10;
+  static Random rand = new Random();
+
+  private static final Log LOG = LogFactory.getLog(HRegion.class);
+
+  /**
+   * Merge two HRegions.  They must be available on the current HRegionServer.
+   * Returns a brand-new active HRegion, also running on the current HRegionServer.
+   */
+  public static HRegion closeAndMerge(HRegion srcA, HRegion srcB) throws IOException {
+
+    // Make sure that srcA comes first; important for key-ordering during
+    // write of the merged file.
+    
+    if(srcA.getStartKey() == null) {
+      if(srcB.getStartKey() == null) {
+        throw new IOException("Cannot merge two regions with null start key");
+      }
+      // A's start key is null but B's isn't. Assume A comes before B
+      
+    } else if((srcB.getStartKey() == null)         // A is not null but B is
+        || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B
+      
+      HRegion tmp = srcA;
+      srcA = srcB;
+      srcB = tmp;
+    }
+    
+    if (! srcA.getEndKey().equals(srcB.getStartKey())) {
+      throw new IOException("Cannot merge non-adjacent regions");
+    }
+
+    FileSystem fs = srcA.getFilesystem();
+    Configuration conf = srcA.getConf();
+    HTableDescriptor tabledesc = srcA.getTableDesc();
+    HLog log = srcA.getLog();
+    Path dir = srcA.getDir();
+
+    Text startKey = srcA.getStartKey();
+    Text endKey = srcB.getEndKey();
+
+    Path merges = new Path(srcA.getRegionDir(), MERGEDIR);
+    if(! fs.exists(merges)) {
+      fs.mkdirs(merges);
+    }
+    
+    HRegionInfo newRegionInfo
+      = new HRegionInfo(Math.abs(rand.nextLong()), tabledesc, startKey, endKey);
+    
+    Path newRegionDir = HStoreFile.getHRegionDir(merges, newRegionInfo.regionName);
+
+    if(fs.exists(newRegionDir)) {
+      throw new IOException("Cannot merge; target file collision at " + newRegionDir);
+    }
+
+    LOG.info("starting merge of regions: " + srcA.getRegionName() + " and " 
+        + srcB.getRegionName() + " new region start key is '" 
+        + (startKey == null ? "" : startKey) + "', end key is '" 
+        + (endKey == null ? "" : endKey) + "'");
+    
+    // Flush each of the sources, and merge their files into a single 
+    // target for each column family.
+
+    LOG.debug("flushing and getting file names for region " + srcA.getRegionName());
+    
+    TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
+    TreeMap<Text, Vector<HStoreFile>> filesToMerge = new TreeMap<Text, Vector<HStoreFile>>();
+    for(Iterator<HStoreFile> it = srcA.flushcache(true).iterator(); it.hasNext(); ) {
+      HStoreFile src = it.next();
+      Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
+      if(v == null) {
+        v = new Vector<HStoreFile>();
+        filesToMerge.put(src.getColFamily(), v);
+      }
+      v.add(src);
+    }
+    
+    LOG.debug("flushing and getting file names for region " + srcB.getRegionName());
+    
+    for(Iterator<HStoreFile> it = srcB.flushcache(true).iterator(); it.hasNext(); ) {
+      HStoreFile src = it.next();
+      Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
+      if(v == null) {
+        v = new Vector<HStoreFile>();
+        filesToMerge.put(src.getColFamily(), v);
+      }
+      v.add(src);
+    }
+    
+    LOG.debug("merging stores");
+    
+    for(Iterator<Text> it = filesToMerge.keySet().iterator(); it.hasNext(); ) {
+      Text colFamily = it.next();
+      Vector<HStoreFile> srcFiles = filesToMerge.get(colFamily);
+      HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName, 
+          colFamily, Math.abs(rand.nextLong()));
+      
+      dst.mergeStoreFiles(srcFiles, fs, conf);
+      alreadyMerged.addAll(srcFiles);
+    }
+
+    // That should have taken care of the bulk of the data.
+    // Now close the source HRegions for good, and repeat the above to take care
+    // of any last-minute inserts
+
+    LOG.debug("flushing changes since start of merge for region " 
+        + srcA.getRegionName());
+
+    filesToMerge.clear();
+    for(Iterator<HStoreFile> it = srcA.close().iterator(); it.hasNext(); ) {
+      HStoreFile src = it.next();
+      
+      if(! alreadyMerged.contains(src)) {
+        Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
+        if(v == null) {
+          v = new Vector<HStoreFile>();
+          filesToMerge.put(src.getColFamily(), v);
+        }
+        v.add(src);
+      }
+    }
+    
+    LOG.debug("flushing changes since start of merge for region " 
+        + srcB.getRegionName());
+    
+    for(Iterator<HStoreFile> it = srcB.close().iterator(); it.hasNext(); ) {
+      HStoreFile src = it.next();
+      
+      if(! alreadyMerged.contains(src)) {
+        Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
+        if(v == null) {
+          v = new Vector<HStoreFile>();
+          filesToMerge.put(src.getColFamily(), v);
+        }
+        v.add(src);
+      }
+    }
+    
+    LOG.debug("merging changes since start of merge");
+    
+    for(Iterator<Text> it = filesToMerge.keySet().iterator(); it.hasNext(); ) {
+      Text colFamily = it.next();
+      Vector<HStoreFile> srcFiles = filesToMerge.get(colFamily);
+      HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
+          colFamily, Math.abs(rand.nextLong()));
+      
+      dst.mergeStoreFiles(srcFiles, fs, conf);
+    }
+
+    // Done
+    
+    HRegion dstRegion = new HRegion(dir, log, fs, conf, newRegionInfo,
+        newRegionDir, null);
+
+    // Get rid of merges directory
+    
+    fs.delete(merges);
+
+    LOG.info("merge completed. New region is " + dstRegion.getRegionName());
+    
+    return dstRegion;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Members
+  //////////////////////////////////////////////////////////////////////////////
+
+  TreeMap<Text, Long> rowsToLocks = new TreeMap<Text, Long>();
+  TreeMap<Long, Text> locksToRows = new TreeMap<Long, Text>();
+  TreeMap<Text, HStore> stores = new TreeMap<Text, HStore>();
+  TreeMap<Long, TreeMap<Text, byte[]>> targetColumns 
+      = new TreeMap<Long, TreeMap<Text, byte[]>>();
+  
+  HMemcache memcache = new HMemcache();
+
+  Path dir;
+  HLog log;
+  FileSystem fs;
+  Configuration conf;
+  HRegionInfo regionInfo;
+  Path regiondir;
+
+  class WriteState {
+    public boolean writesOngoing;
+    public boolean writesEnabled;
+    public boolean closed;
+    public WriteState() {
+      this.writesOngoing = true;
+      this.writesEnabled = true;
+      this.closed = false;
+    }
+  }
+  
+  WriteState writestate = new WriteState();
+  int recentCommits = 0;
+  int commitsSinceFlush = 0;
+
+  int maxUnflushedEntries = 0;
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Constructor
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * An HRegion is defined by its table and its key extent.
+   * 
+   * It consists of at least one HStore.  The number of HStores should be 
+   * configurable, so that data which is accessed together is stored in the same
+   * HStore.  Right now, we approximate that by building a single HStore for 
+   * each column family.  (This config info will be communicated via the 
+   * tabledesc.)
+   *
+   * The HLog is the outbound log for any updates to the HRegion.  (There's a 
+   * single HLog for all the HRegions on a single HRegionServer.)
+   *
+   * The HTableDescriptor contains metainfo about the HRegion's table.  
+   *
+   * regionName is a unique identifier for this HRegion.
+   *
+   * (startKey, endKey] defines the keyspace for this HRegion.  NULL values
+   * indicate we're at the start or end of the table.
+   *
+   * fs is the filesystem.  regiondir is where the HRegion is stored.
+   *
+   * logfile is a logfile from the previous execution that's custom-computed for
+   * this HRegion.  The HRegionServer computes and sorts the appropriate log
+   * info for this HRegion.
+   *
+   * conf is global configuration settings.
+   *
+   * If there are initial files (implying that the HRegion is new), then read 
+   * them from the supplied path.
+   *
+   * If there is a previous log file (implying that the HRegion has been 
+   * written-to before), then read it from the supplied path.
+   */
+  public HRegion(Path dir, HLog log, FileSystem fs, Configuration conf, 
+      HRegionInfo regionInfo, Path initialFiles, Path oldLogFile) throws IOException {
+    
+    this.dir = dir;
+    this.log = log;
+    this.fs = fs;
+    this.conf = conf;
+    this.regionInfo = regionInfo;
+
+    this.writestate.writesOngoing = true;
+    this.writestate.writesEnabled = true;
+    this.writestate.closed = false;
+
+    // Declare the regionName.  This is a unique string for the region, used to 
+    // build a unique filename.
+    
+    this.regiondir = HStoreFile.getHRegionDir(dir, this.regionInfo.regionName);
+
+    // Move prefab HStore files into place (if any)
+    
+    if(initialFiles != null && fs.exists(initialFiles)) {
+      fs.rename(initialFiles, regiondir);
+    }
+
+    // Load in all the HStores.
+    
+    for(Iterator<Text> it = this.regionInfo.tableDesc.families().iterator();
+        it.hasNext(); ) {
+      
+      Text colFamily = it.next();
+      stores.put(colFamily, new HStore(dir, this.regionInfo.regionName, colFamily, 
+          this.regionInfo.tableDesc.getMaxVersions(), fs, oldLogFile, conf));
+    }
+
+    // Get rid of any splits or merges that were lost in-progress
+    
+    Path splits = new Path(regiondir, SPLITDIR);
+    if(fs.exists(splits)) {
+      fs.delete(splits);
+    }
+    
+    Path merges = new Path(regiondir, MERGEDIR);
+    if(fs.exists(merges)) {
+      fs.delete(merges);
+    }
+
+    this.maxUnflushedEntries = conf.getInt("hbase.hregion.maxunflushed", 10000);
+
+    // HRegion is ready to go!
+    
+    this.writestate.writesOngoing = false;
+    
+    LOG.info("region " + this.regionInfo.regionName + " available");
+  }
+
+  /** Returns a HRegionInfo object for this region */
+  public HRegionInfo getRegionInfo() {
+    return this.regionInfo;
+  }
+  
+  /** Closes and deletes this HRegion. Called when doing a table deletion, for example */
+  public void closeAndDelete() throws IOException {
+    close();
+    fs.delete(regiondir);
+  }
+  
+  /**
+   * Close down this HRegion.  Flush the cache, shut down each HStore, don't 
+   * service any more calls.
+   *
+   * The returned Vector is a list of all the storage files that the HRegion's 
+   * component HStores make use of.  It's a list of HStoreFile objects.
+   *
+   * This method could take some time to execute, so don't call it from a 
+   * time-sensitive thread.
+   */
+  public Vector<HStoreFile> close() throws IOException {
+    boolean shouldClose = false;
+    synchronized(writestate) {
+      if(writestate.closed) {
+        LOG.info("region " + this.regionInfo.regionName + " closed");
+        return new Vector<HStoreFile>();
+      }
+      while(writestate.writesOngoing) {
+        try {
+          writestate.wait();
+        } catch (InterruptedException iex) {
+        }
+      }
+      writestate.writesOngoing = true;
+      shouldClose = true;
+    }
+
+    if(! shouldClose) {
+      return null;
+      
+    } else {
+      LOG.info("closing region " + this.regionInfo.regionName);
+      Vector<HStoreFile> allHStoreFiles = internalFlushcache();
+      for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
+        HStore store = it.next();
+        store.close();
+      }
+      try {
+        return allHStoreFiles;
+        
+      } finally {
+        synchronized(writestate) {
+          writestate.closed = true;
+          writestate.writesOngoing = false;
+        }
+        LOG.info("region " + this.regionInfo.regionName + " closed");
+      }
+    }
+  }
+
+  /**
+   * Split the HRegion to create two brand-new ones.  This will also close the 
+   * current HRegion.
+   *
+   * Returns two brand-new (and open) HRegions
+   */
+  public HRegion[] closeAndSplit(Text midKey) throws IOException {
+    if(((regionInfo.startKey.getLength() != 0)
+        && (regionInfo.startKey.compareTo(midKey) > 0))
+        || ((regionInfo.endKey.getLength() != 0)
+            && (regionInfo.endKey.compareTo(midKey) < 0))) {
+      throw new IOException("Region splitkey must lie within region boundaries.");
+    }
+
+    LOG.info("splitting region " + this.regionInfo.regionName);
+
+    // Flush this HRegion out to storage, and turn off flushes
+    // or compactions until close() is called.
+    
+    Path splits = new Path(regiondir, SPLITDIR);
+    if(! fs.exists(splits)) {
+      fs.mkdirs(splits);
+    }
+    
+    long regionAId = Math.abs(rand.nextLong());
+    HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc, 
+        regionInfo.startKey, midKey);
+        
+    long regionBId = Math.abs(rand.nextLong());
+    HRegionInfo regionBInfo
+      = new HRegionInfo(regionBId, regionInfo.tableDesc, midKey, null);
+    
+    Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName);
+    Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName);
+
+    if(fs.exists(dirA) || fs.exists(dirB)) {
+      throw new IOException("Cannot split; target file collision at " + dirA 
+          + " or " + dirB);
+    }
+    
+    TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
+    Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
+    for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
+      HStoreFile hsf = it.next();
+      
+      LOG.debug("splitting HStore " + hsf.getRegionName() + "/" + hsf.getColFamily()
+          + "/" + hsf.fileId());
+
+      HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName, 
+          hsf.getColFamily(), Math.abs(rand.nextLong()));
+      
+      HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName, 
+          hsf.getColFamily(), Math.abs(rand.nextLong()));
+      
+      hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
+      alreadySplit.add(hsf);
+    }
+
+    // We just copied most of the data.  Now close the HRegion
+    // and copy the small remainder
+    
+    hstoreFilesToSplit = close();
+    for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
+      HStoreFile hsf = it.next();
+      
+      if(! alreadySplit.contains(hsf)) {
+        LOG.debug("splitting HStore " + hsf.getRegionName() + "/" + hsf.getColFamily()
+            + "/" + hsf.fileId());
+
+        HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName, 
+            hsf.getColFamily(), Math.abs(rand.nextLong()));
+        
+        HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName, 
+            hsf.getColFamily(), Math.abs(rand.nextLong()));
+        
+        hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
+      }
+    }
+
+    // Done
+
+    HRegion regionA = new HRegion(dir, log, fs, conf, regionAInfo, dirA, null);
+    
+    HRegion regionB = new HRegion(dir, log, fs, conf, regionBInfo, dirB, null);
+
+    // Cleanup
+
+    fs.delete(splits);                          // Get rid of splits directory
+    fs.delete(regiondir);                       // and the directory for the old region
+    
+    HRegion regions[] = new HRegion[2];
+    regions[0] = regionA;
+    regions[1] = regionB;
+    
+    LOG.info("region split complete. new regions are: " + regions[0].getRegionName()
+        + ", " + regions[1].getRegionName());
+    
+    return regions;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // HRegion accessors
+  //////////////////////////////////////////////////////////////////////////////
+
+  public Text getStartKey() {
+    return regionInfo.startKey;
+  }
+  
+  public Text getEndKey() {
+    return regionInfo.endKey;
+  }
+  
+  public long getRegionId() {
+    return regionInfo.regionId;
+  }
+
+  public Text getRegionName() {
+    return regionInfo.regionName;
+  }
+  
+  public Path getDir() {
+    return dir;
+  }
+ 
+  public HTableDescriptor getTableDesc() {
+    return regionInfo.tableDesc;
+  }
+  
+  public HLog getLog() {
+    return log;
+  }
+  
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  public Path getRegionDir() {
+    return regiondir;
+  }
+  
+  public FileSystem getFilesystem() {
+    return fs;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // HRegion maintenance.  
+  //
+  // These methods are meant to be called periodically by the HRegionServer for 
+  // upkeep.
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Iterates through all the HStores and finds the one with the largest MapFile
+   * size. If the size is greater than the (currently hard-coded) threshold,
+   * returns true indicating that the region should be split. The midKey for the
+   * largest MapFile is returned through the midKey parameter.
+   * 
+   * @param midKey      - (return value) midKey of the largest MapFile
+   * @return            - true if the region should be split
+   * 
+   * @throws IOException
+   */
+  public boolean needsSplit(Text midKey) throws IOException {
+    Text key = new Text();
+    long maxSize = 0;
+
+    for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
+      long size = i.next().getLargestFileSize(key);
+      
+      if(size > maxSize) {                      // Largest so far
+        maxSize = size;
+        midKey.set(key);
+      }
+    }
+
+    return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2)));
+  }
+
+  /**
+   * Compact all the stores.  This should be called periodically to make sure 
+   * the stores are kept manageable.  
+   *
+   * This operation could block for a long time, so don't call it from a 
+   * time-sensitive thread.
+   *
+   * If it returns TRUE, the compaction has completed.
+   * 
+   * If it returns FALSE, the compaction was not carried out, because the 
+   * HRegion is busy doing something else storage-intensive (like flushing the 
+   * cache).  The caller should check back later.
+   */
+  public boolean compactStores() throws IOException {
+    boolean shouldCompact = false;
+    synchronized(writestate) {
+      if((! writestate.writesOngoing)
+          && writestate.writesEnabled
+          && (! writestate.closed)
+          && recentCommits > MIN_COMMITS_FOR_COMPACTION) {
+        
+        writestate.writesOngoing = true;
+        shouldCompact = true;
+      }
+    }
+
+    if(! shouldCompact) {
+      LOG.info("not compacting region " + this.regionInfo.regionName);
+      return false;
+      
+    } else {
+      try {
+        LOG.info("starting compaction on region " + this.regionInfo.regionName);
+        for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
+          HStore store = it.next();
+          store.compact();
+        }
+        LOG.info("compaction completed on region " + this.regionInfo.regionName);
+        return true;
+        
+      } finally {
+        synchronized(writestate) {
+          writestate.writesOngoing = false;
+          recentCommits = 0;
+          writestate.notifyAll();
+        }
+      }
+    }
+  }
+
+  /**
+   * Each HRegion is given a periodic chance to flush the cache, which it should
+   * only take if there have been a lot of uncommitted writes.
+   */
+  public void optionallyFlush() throws IOException {
+    if(commitsSinceFlush > maxUnflushedEntries) {
+      flushcache(false);
+    }
+  }
+
+  /**
+   * Flush the cache.  This is called periodically to minimize the amount of log
+   * processing needed upon startup.
+   * 
+   * The returned Vector is a list of all the files used by the component HStores.
+   * It is a list of HStoreFile objects.  If the returned value is NULL, then the
+   * flush could not be executed, because the HRegion is busy doing something
+   * else storage-intensive.  The caller should check back later.
+   *
+   * The 'disableFutureWrites' boolean indicates that the caller intends to 
+   * close() the HRegion shortly, so the HRegion should not take on any new and 
+   * potentially long-lasting disk operations.  This flush() should be the final
+   * pre-close() disk operation.
+   *
+   * This method may block for some time, so it should not be called from a 
+   * time-sensitive thread.
+   */
+  public Vector<HStoreFile> flushcache(boolean disableFutureWrites) throws IOException {
+    boolean shouldFlush = false;
+    synchronized(writestate) {
+      if((! writestate.writesOngoing)
+          && writestate.writesEnabled
+          && (! writestate.closed)) {
+        
+        writestate.writesOngoing = true;
+        shouldFlush = true;
+        
+        if(disableFutureWrites) {
+          writestate.writesEnabled = false;
+        }
+      }
+    }
+    
+    if(! shouldFlush) {
+      LOG.debug("not flushing cache for region " + this.regionInfo.regionName);
+      return null;
+      
+    } else {
+      try {
+        return internalFlushcache();
+        
+      } finally {
+        synchronized(writestate) {
+          writestate.writesOngoing = false;
+          writestate.notifyAll();
+        }
+      }
+    }
+  }
+
+  /**
+   * Flushing the cache is a little tricky.  We have a lot of updates in the 
+   * HMemcache, all of which have also been written to the log.  We need to write
+   * those updates in the HMemcache out to disk, while being able to process 
+   * reads/writes as much as possible during the flush operation.  Also, the log
+   * has to state clearly the point in time at which the HMemcache was flushed.
+   * (That way, during recovery, we know when we can rely on the on-disk flushed
+   * structures and when we have to recover the HMemcache from the log.)
+   *
+   * So, we have a three-step process:
+   *
+   * A. Flush the memcache to the on-disk stores, noting the current sequence ID
+   *    for the log.
+   *    
+   * B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence ID 
+   *    that was current at the time of memcache-flush.
+   *    
+   * C. Get rid of the memcache structures that are now redundant, as they've 
+   *    been flushed to the on-disk HStores.
+   *
+   * This method is protected, but can be accessed via several public routes.
+   *
+   * This method may block for some time.
+   */
+  Vector<HStoreFile> internalFlushcache() throws IOException {
+    Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
+    
+    LOG.debug("flushing cache for region " + this.regionInfo.regionName);
+
+    // We pass the log to the HMemcache, so we can lock down 
+    // both simultaneously.  We only have to do this for a moment:
+    // we need the HMemcache state at the time of a known log sequence
+    // number.  Since multiple HRegions may write to a single HLog,
+    // the sequence numbers may zoom past unless we lock it.
+    //
+    // When execution returns from snapshotMemcacheForLog()
+    // with a non-NULL value, the HMemcache will have a snapshot 
+    // object stored that must be explicitly cleaned up using
+    // a call to deleteSnapshot().
+    
+    LOG.debug("starting memcache snapshot");
+    
+    HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
+    TreeMap<HStoreKey, BytesWritable> memcacheSnapshot = retval.memcacheSnapshot;
+    if(memcacheSnapshot == null) {
+      for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
+        HStore hstore = it.next();
+        Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
+        allHStoreFiles.addAll(0, hstoreFiles);
+      }
+      return allHStoreFiles;
+    }
+    
+    long logCacheFlushId = retval.sequenceId;
+
+    // A.  Flush memcache to all the HStores.
+    
+    LOG.debug("flushing memcache to HStores");
+    
+    for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
+      HStore hstore = it.next();
+      Vector<HStoreFile> hstoreFiles 
+        = hstore.flushCache(memcacheSnapshot, logCacheFlushId);
+      
+      allHStoreFiles.addAll(0, hstoreFiles);
+    }
+
+    // B.  Write a FLUSHCACHE-COMPLETE message to the log.
+    //     This tells future readers that the HStores were emitted correctly,
+    //     and that all updates to the log for this regionName that have lower 
+    //     log-sequence-ids can be safely ignored.
+    
+    LOG.debug("writing flush cache complete to log");
+    
+    log.completeCacheFlush(this.regionInfo.regionName,
+        regionInfo.tableDesc.getName(), logCacheFlushId);
+
+    // C. Delete the now-irrelevant memcache snapshot; its contents have been 
+    //    dumped to disk-based HStores.
+    
+    LOG.debug("deleting memcache snapshot");
+    
+    memcache.deleteSnapshot();
+
+    LOG.debug("cache flush complete for region " + this.regionInfo.regionName);
+    
+    this.commitsSinceFlush = 0;
+    return allHStoreFiles;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // get() methods for client use.
+  //////////////////////////////////////////////////////////////////////////////
+
+  /** Fetch a single data item. */
+  public byte[] get(Text row, Text column) throws IOException {
+    byte results[][] = get(row, column, Long.MAX_VALUE, 1);
+    if(results == null) {
+      return null;
+      
+    } else {
+      return results[0];
+    }
+  }
+  
+  /** Fetch multiple versions of a single data item */
+  public byte[][] get(Text row, Text column, int numVersions) throws IOException {
+    return get(row, column, Long.MAX_VALUE, numVersions);
+  }
+
+  /** Fetch multiple versions of a single data item, with timestamp. */
+  public byte[][] get(Text row, Text column, long timestamp, int numVersions) 
+      throws IOException {
+    
+    if(writestate.closed) {
+      throw new IOException("HRegion is closed.");
+    }
+
+    // Make sure this is a valid row and valid column
+
+    checkRow(row);
+    Text colFamily = HStoreKey.extractFamily(column);
+    checkFamily(colFamily);
+
+    // Obtain the row-lock
+
+    obtainLock(row);
+    try {
+      // Obtain the -col results
+      return get(new HStoreKey(row, column, timestamp), numVersions);
+    
+    } finally {
+      releaseLock(row);
+    }
+  }
+
+  // Private implementation: get the value for the indicated HStoreKey
+
+  private byte[][] get(HStoreKey key, int numVersions) throws IOException {
+
+    // Check the memcache
+
+    byte[][] result = memcache.get(key, numVersions);
+    if(result != null) {
+      return result;
+    }
+
+    // If unavailable in memcache, check the appropriate HStore
+
+    Text colFamily = HStoreKey.extractFamily(key.getColumn());
+    HStore targetStore = stores.get(colFamily);
+    if(targetStore == null) {
+      return null;
+    }
+    
+    return targetStore.get(key, numVersions);
+  }
+
+  /**
+   * Fetch all the columns for the indicated row.
+   * Returns a TreeMap that maps column names to values.
+   *
+   * We should eventually use Bloom filters here, to reduce running time.  If 
+   * the database has many column families and is very sparse, then we could be 
+   * checking many files needlessly.  A small Bloom for each row would help us 
+   * determine which column groups are useful for that row.  That would let us 
+   * avoid a bunch of disk activity.
+   */
+  public TreeMap<Text, byte[]> getFull(Text row) throws IOException {
+    HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
+
+    TreeMap<Text, byte[]> memResult = memcache.getFull(key);
+    for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
+      Text colFamily = it.next();
+      HStore targetStore = stores.get(colFamily);
+      targetStore.getFull(key, memResult);
+    }
+    return memResult;
+  }
+
+  /**
+   * Return an iterator that scans over the HRegion, returning the indicated 
+   * columns.  This Iterator must be closed by the caller.
+   */
+  public HScannerInterface getScanner(Text cols[], Text firstRow) throws IOException {
+    TreeSet<Text> families = new TreeSet<Text>();
+    for(int i = 0; i < cols.length; i++) {
+      families.add(HStoreKey.extractFamily(cols[i]));
+    }
+
+    HStore storelist[] = new HStore[families.size()];
+    int i = 0;
+    for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
+      Text family = it.next();
+      storelist[i++] = stores.get(family);
+    }
+    return new HScanner(cols, firstRow, memcache, storelist);
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // set() methods for client use.
+  //////////////////////////////////////////////////////////////////////////////
+  
+  /**
+   * The caller wants to apply a series of writes to a single row in the HRegion.
+   * The caller will invoke startUpdate(), followed by a series of calls to 
+   * put/delete, then finally either abort() or commit().
+   *
+   * Note that we rely on the external caller to properly abort() or commit() 
+   * every transaction.  If the caller is a network client, there should be a 
+   * lease-system in place that automatically aborts() transactions after a 
+   * specified quiet period.
+   */
+  public long startUpdate(Text row) throws IOException {
+
+    // We obtain a per-row lock, so other clients will
+    // block while one client performs an update.
+    
+    return obtainLock(row);
+  }
+
+  /**
+   * Put a cell value into the locked row.  The user indicates the row-lock, the
+   * target column, and the desired value.  This stuff is set into a temporary 
+   * memory area until the user commits the change, at which pointit's logged 
+   * and placed into the memcache.
+   *
+   * This method really just tests the input, then calls an internal localput() 
+   * method.
+   */
+  public void put(long lockid, Text targetCol, byte[] val) throws IOException {
+    if(val.length == HStoreKey.DELETE_BYTES.length) {
+      boolean matches = true;
+      for(int i = 0; i < val.length; i++) {
+        if(val[i] != HStoreKey.DELETE_BYTES[i]) {
+          matches = false;
+          break;
+        }
+      }
+      
+      if(matches) {
+        throw new IOException("Cannot insert value: " + val);
+      }
+    }
+    
+    localput(lockid, targetCol, val);
+  }
+
+  /**
+   * Delete a value or write a value. This is a just a convenience method for put().
+   */
+  public void delete(long lockid, Text targetCol) throws IOException {
+    localput(lockid, targetCol, HStoreKey.DELETE_BYTES);
+  }
+
+  /**
+   * Private implementation.
+   * 
+   * localput() is used for both puts and deletes. We just place the values into
+   * a per-row pending area, until a commit() or abort() call is received.
+   * (Or until the user's write-lock expires.)
+   */
+  void localput(long lockid, Text targetCol, byte[] val) throws IOException {
+    Text row = getRowFromLock(lockid);
+    if(row == null) {
+      throw new IOException("No write lock for lockid " + lockid);
+    }
+
+    // This sync block makes localput() thread-safe when multiple
+    // threads from the same client attempt an insert on the same 
+    // locked row (via lockid).
+
+    synchronized(row) {
+      
+      // This check makes sure that another thread from the client
+      // hasn't aborted/committed the write-operation.
+
+      if(row != getRowFromLock(lockid)) {
+        throw new IOException("Locking error: put operation on lock " + lockid 
+            + " unexpected aborted by another thread");
+      }
+      
+      TreeMap<Text, byte[]> targets = targetColumns.get(lockid);
+      if(targets == null) {
+        targets = new TreeMap<Text, byte[]>();
+        targetColumns.put(lockid, targets);
+      }
+      targets.put(targetCol, val);
+    }
+  }
+
+  /**
+   * Abort a pending set of writes. This dumps from memory all in-progress
+   * writes associated with the given row-lock.  These values have not yet
+   * been placed in memcache or written to the log.
+   */
+  public void abort(long lockid) throws IOException {
+    Text row = getRowFromLock(lockid);
+    if(row == null) {
+      throw new IOException("No write lock for lockid " + lockid);
+    }
+    
+    // This sync block makes abort() thread-safe when multiple
+    // threads from the same client attempt to operate on the same
+    // locked row (via lockid).
+    
+    synchronized(row) {
+      
+      // This check makes sure another thread from the client
+      // hasn't aborted/committed the write-operation.
+      
+      if(row != getRowFromLock(lockid)) {
+        throw new IOException("Locking error: abort() operation on lock " 
+            + lockid + " unexpected aborted by another thread");
+      }
+      
+      targetColumns.remove(lockid);
+      releaseLock(row);
+    }
+  }
+
+  /**
+   * Commit a pending set of writes to the memcache. This also results in writing
+   * to the change log.
+   *
+   * Once updates hit the change log, they are safe.  They will either be moved 
+   * into an HStore in the future, or they will be recovered from the log.
+   */
+  public void commit(long lockid) throws IOException {
+    
+    // Remove the row from the pendingWrites list so 
+    // that repeated executions won't screw this up.
+    
+    Text row = getRowFromLock(lockid);
+    if(row == null) {
+      throw new IOException("No write lock for lockid " + lockid);
+    }
+    
+    // This check makes sure that another thread from the client
+    // hasn't aborted/committed the write-operation
+
+    synchronized(row) {
+      
+      // We can now commit the changes.
+      // Add updates to the log and add values to the memcache.
+
+      long commitTimestamp = System.currentTimeMillis();
+      log.append(regionInfo.regionName, regionInfo.tableDesc.getName(), row, 
+          targetColumns.get(lockid), commitTimestamp);
+      
+      memcache.add(row, targetColumns.get(lockid), commitTimestamp);
+
+      // OK, all done!
+
+      targetColumns.remove(lockid);
+      releaseLock(row);
+    }
+    recentCommits++;
+    this.commitsSinceFlush++;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Support code
+  //////////////////////////////////////////////////////////////////////////////
+
+  /** Make sure this is a valid row for the HRegion */
+  void checkRow(Text row) throws IOException {
+    if(((regionInfo.startKey.getLength() == 0)
+        || (regionInfo.startKey.compareTo(row) <= 0))
+        && ((regionInfo.endKey.getLength() == 0)
+            || (regionInfo.endKey.compareTo(row) > 0))) {
+      // all's well
+      
+    } else {
+      throw new IOException("Requested row out of range for HRegion "
+          + regionInfo.regionName + ", startKey='" + regionInfo.startKey
+          + "', endKey='" + regionInfo.endKey + "', row='" + row + "'");
+    }
+  }
+
+  /** Make sure this is a valid column for the current table */
+  void checkFamily(Text family) throws IOException {
+    if(! regionInfo.tableDesc.hasFamily(family)) {
+      throw new IOException("Requested column family " + family 
+          + " does not exist in HRegion " + regionInfo.regionName
+          + " for table " + regionInfo.tableDesc.getName());
+    }
+  }
+
+  /**
+   * Obtain a lock on the given row.  Blocks until success.
+   *
+   * I know it's strange to have two mappings:
+   *   ROWS  ==> LOCKS
+   * as well as
+   *   LOCKS ==> ROWS
+   *
+   * But it acts as a guard on the client; a miswritten client just can't submit
+   * the name of a row and start writing to it; it must know the correct lockid,
+   * which matches the lock list in memory.
+   * 
+   * It would be more memory-efficient to assume a correctly-written client, 
+   * which maybe we'll do in the future.
+   */
+  long obtainLock(Text row) throws IOException {
+    synchronized(rowsToLocks) {
+      while(rowsToLocks.get(row) != null) {
+        try {
+          rowsToLocks.wait();
+        } catch (InterruptedException ie) {
+        }
+      }
+      
+      long lockid = Math.abs(rand.nextLong());
+      rowsToLocks.put(row, lockid);
+      locksToRows.put(lockid, row);
+      rowsToLocks.notifyAll();
+      return lockid;
+    }
+  }
+  
+  Text getRowFromLock(long lockid) throws IOException {
+    synchronized(rowsToLocks) {
+      return locksToRows.get(lockid);
+    }
+  }
+  
+  /** Release the row lock! */
+  void releaseLock(Text row) throws IOException {
+    synchronized(rowsToLocks) {
+      long lockid = rowsToLocks.remove(row).longValue();
+      locksToRows.remove(lockid);
+      rowsToLocks.notifyAll();
+    }
+  }
+  /*******************************************************************************
+   * HScanner is an iterator through a bunch of rows in an HRegion.
+   ******************************************************************************/
+  private class HScanner implements HScannerInterface {
+    HScannerInterface scanners[] = null;
+    TreeMap<Text, byte[]> resultSets[] = null;
+    HStoreKey keys[] = null;
+
+    /** Create an HScanner with a handle on many HStores. */
+    @SuppressWarnings("unchecked")
+    public HScanner(Text cols[], Text firstRow, HMemcache memcache, HStore stores[]) throws IOException {
+      long scanTime = System.currentTimeMillis();
+      this.scanners = new HScannerInterface[stores.length + 1];
+      this.keys = new HStoreKey[scanners.length];
+      this.resultSets = new TreeMap[scanners.length];
+
+      // Advance to the first key in each store.
+      // All results will match the required column-set and scanTime.
+
+      for(int i = 0; i < stores.length; i++) {
+        scanners[i] = stores[i].getScanner(scanTime, cols, firstRow);
+      }
+      scanners[scanners.length-1] = memcache.getScanner(scanTime, cols, firstRow);
+
+      for(int i = 0; i < scanners.length; i++) {
+        keys[i] = new HStoreKey();
+        resultSets[i] = new TreeMap<Text, byte[]>();
+
+        if(! scanners[i].next(keys[i], resultSets[i])) {
+          closeScanner(i);
+        }
+      }
+    }
+
+    /**
+     * Grab the next row's worth of values.  The HScanner will return the most 
+     * recent data value for each row that is not newer than the target time.
+     */
+    public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
+      
+      // Find the lowest-possible key.
+      
+      Text chosenRow = null;
+      long chosenTimestamp = -1;
+      for(int i = 0; i < keys.length; i++) {
+        if(scanners[i] != null
+            && (chosenRow == null
+                || (keys[i].getRow().compareTo(chosenRow) < 0)
+                || ((keys[i].getRow().compareTo(chosenRow) == 0)
+                    && (keys[i].getTimestamp() > chosenTimestamp)))) {
+          
+          chosenRow = new Text(keys[i].getRow());
+          chosenTimestamp = keys[i].getTimestamp();
+        }
+      }
+
+      // Store the key and results for each sub-scanner. Merge them as appropriate.
+      
+      boolean insertedItem = false;
+      if(chosenTimestamp > 0) {
+        key.setRow(chosenRow);
+        key.setVersion(chosenTimestamp);
+        key.setColumn(new Text(""));
+
+        for(int i = 0; i < scanners.length; i++) {        
+          while((scanners[i] != null)
+              && (keys[i].getRow().compareTo(chosenRow) == 0)
+              && (keys[i].getTimestamp() == chosenTimestamp)) {
+            
+            results.putAll(resultSets[i]);
+            insertedItem = true;
+
+            resultSets[i].clear();
+            if(! scanners[i].next(keys[i], resultSets[i])) {
+              closeScanner(i);
+            }
+          }
+
+          // If the current scanner is non-null AND has a lower-or-equal
+          // row label, then its timestamp is bad.  We need to advance it.
+
+          while((scanners[i] != null)
+              && (keys[i].getRow().compareTo(chosenRow) <= 0)) {
+            
+            resultSets[i].clear();
+            if(! scanners[i].next(keys[i], resultSets[i])) {
+              closeScanner(i);
+            }
+          }
+        }
+      }
+      return insertedItem;
+    }
+
+    /** Shut down a single scanner */
+    void closeScanner(int i) throws IOException {
+      try {
+        scanners[i].close();
+        
+      } finally {
+        scanners[i] = null;
+        keys[i] = null;
+        resultSets[i] = null;
+      }
+    }
+
+    /** All done with the scanner. */
+    public void close() throws IOException {
+      for(int i = 0; i < scanners.length; i++) {
+        if(scanners[i] != null) {
+          closeScanner(i);
+        }
+      }
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,84 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class HRegionInfo implements Writable {
+  public long regionId;
+  public HTableDescriptor tableDesc;
+  public Text startKey;
+  public Text endKey;
+  public Text regionName;
+  
+  public HRegionInfo() {
+    this.regionId = 0;
+    this.tableDesc = new HTableDescriptor();
+    this.startKey = new Text();
+    this.endKey = new Text();
+    this.regionName = new Text();
+  }
+
+  public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey, 
+      Text endKey) throws IllegalArgumentException {
+    
+    this.regionId = regionId;
+    
+    if(tableDesc == null) {
+      throw new IllegalArgumentException("tableDesc cannot be null");
+    }
+    
+    this.tableDesc = tableDesc;
+    
+    this.startKey = new Text();
+    if(startKey != null) {
+      this.startKey.set(startKey);
+    }
+    
+    this.endKey = new Text();
+    if(endKey != null) {
+      this.endKey.set(endKey);
+    }
+    
+    this.regionName = new Text(tableDesc.getName() + "_"
+        + (startKey == null ? "" : startKey.toString()) + "_" + regionId);
+  }
+    
+  //////////////////////////////////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////////////////////////////////
+
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(regionId);
+    tableDesc.write(out);
+    startKey.write(out);
+    endKey.write(out);
+    regionName.write(out);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    this.regionId = in.readLong();
+    this.tableDesc.readFields(in);
+    this.startKey.readFields(in);
+    this.endKey.readFields(in);
+    this.regionName.readFields(in);
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,61 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+
+/*******************************************************************************
+ * Clients interact with HRegionServers using
+ * a handle to the HRegionInterface.
+ ******************************************************************************/
+public interface HRegionInterface {
+  public static final long versionID = 1L; // initial version
+
+  // Get metainfo about an HRegion
+
+  public HRegionInfo getRegionInfo(Text regionName);
+
+  // Start a scanner for a given HRegion.
+
+  public HScannerInterface openScanner(Text regionName, Text[] columns, Text startRow) throws IOException;
+
+  // GET methods for an HRegion.
+
+  public BytesWritable get(Text regionName, Text row, Text column) throws IOException;
+  public BytesWritable[] get(Text regionName, Text row, Text column, int numVersions) throws IOException;
+  public BytesWritable[] get(Text regionName, Text row, Text column, long timestamp, int numVersions) throws IOException;
+  public LabelledData[] getRow(Text regionName, Text row) throws IOException;
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Start an atomic row insertion/update.  No changes are committed until the 
+  // call to commit() returns. A call to abort() will abandon any updates in progress.
+  //
+  // Callers to this method are given a lease for each unique lockid; before the
+  // lease expires, either abort() or commit() must be called. If it is not 
+  // called, the system will automatically call abort() on the client's behalf.
+  //
+  // The client can gain extra time with a call to renewLease().
+  //////////////////////////////////////////////////////////////////////////////
+
+  public long startUpdate(Text regionName, long clientid, Text row) throws IOException;
+  public void put(Text regionName, long clientid, long lockid, Text column, BytesWritable val) throws IOException;
+  public void delete(Text regionName, long clientid, long lockid, Text column) throws IOException;
+  public void abort(Text regionName, long clientid, long lockid) throws IOException;
+  public void commit(Text regionName, long clientid, long lockid) throws IOException;
+  public void renewLease(long lockid, long clientid) throws IOException;
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,818 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.conf.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * HRegionServer makes a set of HRegions available to clients.  It checks in with
+ * the HMaster. There are many HRegionServers in a single HBase deployment.
+ ******************************************************************************/
+public class HRegionServer implements HConstants, HRegionInterface, Runnable {
+  private boolean stopRequested;
+  private Path regionDir;
+  private HServerAddress address;
+  private Configuration conf;
+  private Random rand;
+  private TreeMap<Text, HRegion> regions;               // region name -> HRegion
+  private HLocking locking;
+  private Vector<HMsg> outboundMsgs;
+
+  private long threadWakeFrequency;
+  private int maxLogEntries;
+  private long msgInterval;
+  
+  // Check to see if regions should be split
+  
+  private long splitCheckFrequency;
+  private SplitChecker splitChecker;
+  private Thread splitCheckerThread;
+  
+  private class SplitChecker implements Runnable {
+    private HClient client = new HClient(conf);
+  
+    private class SplitRegion {
+      public HRegion region;
+      public Text midKey;
+      
+      SplitRegion(HRegion region, Text midKey) {
+        this.region = region;
+        this.midKey = midKey;
+      }
+    }
+    
+    public void run() {
+      while(! stopRequested) {
+        long startTime = System.currentTimeMillis();
+
+        // Grab a list of regions to check
+
+        Vector<HRegion> checkSplit = new Vector<HRegion>();
+        locking.obtainReadLock();
+        try {
+          checkSplit.addAll(regions.values());
+          
+        } finally {
+          locking.releaseReadLock();
+        }
+
+        // Check to see if they need splitting
+
+        Vector<SplitRegion> toSplit = new Vector<SplitRegion>();
+        for(Iterator<HRegion> it = checkSplit.iterator(); it.hasNext(); ) {
+          HRegion cur = it.next();
+          Text midKey = new Text();
+          
+          try {
+            if(cur.needsSplit(midKey)) {
+              toSplit.add(new SplitRegion(cur, midKey));
+            }
+            
+          } catch(IOException iex) {
+            iex.printStackTrace();
+          }
+        }
+
+        for(Iterator<SplitRegion> it = toSplit.iterator(); it.hasNext(); ) {
+          SplitRegion r = it.next();
+          
+          locking.obtainWriteLock();
+          regions.remove(r.region.getRegionName());
+          locking.releaseWriteLock();
+ 
+          HRegion[] newRegions = null;
+          try {
+            Text oldRegion = r.region.getRegionName();
+            
+            newRegions = r.region.closeAndSplit(r.midKey);
+
+            // When a region is split, the META table needs to updated if we're
+            // splitting a 'normal' region, and the ROOT table needs to be
+            // updated if we are splitting a META region.
+
+            Text tableToUpdate
+              = (oldRegion.find(META_TABLE_NAME.toString()) == 0)
+                ? ROOT_TABLE_NAME : META_TABLE_NAME;
+
+            client.openTable(tableToUpdate);
+            long lockid = client.startUpdate(oldRegion);
+            client.delete(lockid, META_COL_REGIONINFO);
+            client.delete(lockid, META_COL_SERVER);
+            client.delete(lockid, META_COL_STARTCODE);
+            client.commit(lockid);
+            
+            for(int i = 0; i < newRegions.length; i++) {
+              ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+              DataOutputStream out = new DataOutputStream(bytes);
+              newRegions[i].getRegionInfo().write(out);
+              
+              lockid = client.startUpdate(newRegions[i].getRegionName());
+              client.put(lockid, META_COL_REGIONINFO, bytes.toByteArray());
+              client.commit(lockid);
+            }
+            
+            // Now tell the master about the new regions
+            
+            reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
+            newRegions[0].close();
+            newRegions[1].close();
+            
+          } catch(IOException e) {
+            //TODO: What happens if this fails? Are we toast?
+            e.printStackTrace();
+            continue;
+          }
+        }
+        
+        // Sleep
+
+        long endTime = System.currentTimeMillis();
+        try {
+          Thread.sleep(splitCheckFrequency - (endTime - startTime));
+          
+        } catch(InterruptedException iex) {
+        }
+      }
+    }
+  }
+  
+  // Cache flushing
+  
+  private Flusher cacheFlusher;
+  private Thread cacheFlusherThread;
+  private class Flusher implements Runnable {
+    public void run() {
+      while(! stopRequested) {
+        long startTime = System.currentTimeMillis();
+
+        // Grab a list of items to flush
+
+        Vector<HRegion> toFlush = new Vector<HRegion>();
+        locking.obtainReadLock();
+        try {
+          toFlush.addAll(regions.values());
+          
+        } finally {
+          locking.releaseReadLock();
+        }
+
+        // Flush them, if necessary
+
+        for(Iterator<HRegion> it = toFlush.iterator(); it.hasNext(); ) {
+          HRegion cur = it.next();
+          
+          try {
+            cur.optionallyFlush();
+            
+          } catch(IOException iex) {
+            iex.printStackTrace();
+          }
+        }
+
+        // Sleep
+
+        long endTime = System.currentTimeMillis();
+        try {
+          Thread.sleep(threadWakeFrequency - (endTime - startTime));
+          
+        } catch(InterruptedException iex) {
+        }
+      }
+    }
+  }
+  
+  // File paths
+  
+  private FileSystem fs;
+  private Path oldlogfile;
+  
+  // Logging
+  
+  private HLog log;
+  private LogRoller logRoller;
+  private Thread logRollerThread;
+  private class LogRoller implements Runnable {
+    public void run() {
+      while(! stopRequested) {
+
+        // If the number of log entries is high enough, roll the log.  This is a
+        // very fast operation, but should not be done too frequently.
+
+        if(log.getNumEntries() > maxLogEntries) {
+          try {
+            log.rollWriter();
+            
+          } catch(IOException iex) {
+          }
+        }
+        
+        try {
+          Thread.sleep(threadWakeFrequency);
+          
+        } catch(InterruptedException iex) {
+        }
+      }
+    }
+  }
+  
+  // Remote HMaster
+
+  private HMasterRegionInterface hbaseMaster;
+
+  // Server
+  
+  private Server server;
+  
+  // Leases
+  
+  private Leases leases;
+
+  /** Start a HRegionServer at the default location */
+  public HRegionServer(Configuration conf) throws IOException {
+    this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)),
+        new HServerAddress(conf.get("hbase.regionserver.default.name")),
+        conf);
+  }
+  
+  /** Start a HRegionServer at an indicated location */
+  public HRegionServer(Path regionDir, HServerAddress address, Configuration conf) 
+      throws IOException {
+    
+    // Basic setup
+    
+    this.stopRequested = false;
+    this.regionDir = regionDir;
+    this.address = address;
+    this.conf = conf;
+    this.rand = new Random();
+    this.regions = new TreeMap<Text, HRegion>();
+    this.locking = new HLocking(); 
+    this.outboundMsgs = new Vector<HMsg>();
+
+    // Config'ed params
+    
+    this.threadWakeFrequency = conf.getLong("hbase.hregionserver.thread.wakefrequency", 10 * 1000);
+    this.maxLogEntries = conf.getInt("hbase.hregionserver.maxlogentries", 30 * 1000);
+    this.msgInterval = conf.getLong("hbase.hregionserver.msginterval", 15 * 1000);
+    this.splitCheckFrequency = conf.getLong("hbase.hregionserver.thread.splitcheckfrequency", 60 * 1000);
+    
+    // Cache flushing
+    
+    this.cacheFlusher = new Flusher();
+    this.cacheFlusherThread = new Thread(cacheFlusher);
+    
+    // Check regions to see if they need to be split
+    
+    this.splitChecker = new SplitChecker();
+    this.splitCheckerThread = new Thread(splitChecker);
+
+    try {
+      // Local file paths
+
+      this.fs = FileSystem.get(conf);
+      Path newlogdir = new Path(regionDir, "log" + "_" + address.toString());
+      this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + address.toString());
+
+      // Logging
+
+      HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf);
+      this.log = new HLog(fs, newlogdir, conf);
+      this.logRoller = new LogRoller();
+      this.logRollerThread = new Thread(logRoller);
+
+      // Remote HMaster
+
+      this.hbaseMaster = (HMasterRegionInterface)
+      RPC.waitForProxy(HMasterRegionInterface.class,
+          HMasterRegionInterface.versionId,
+          new HServerAddress(conf.get(MASTER_DEFAULT_NAME)).getInetSocketAddress(),
+          conf);
+
+      // Threads
+
+      this.cacheFlusherThread.start();
+      this.splitCheckerThread.start();
+      this.logRollerThread.start();
+      this.leases = new Leases(conf.getLong("hbase.hregionserver.lease.period", 
+          3 * 60 * 1000), threadWakeFrequency);
+      
+      // Server
+
+      this.server = RPC.getServer(this, address.getBindAddress().toString(), 
+          address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
+      this.server.start();
+
+    } catch(IOException e) {
+      this.stopRequested = true;
+      throw e;
+    }
+  }
+
+  /**
+   * Stop all the HRegionServer threads and close everything down. All ongoing 
+   * transactions will be aborted all threads will be shut down. This method
+   * will return immediately. The caller should call join to wait for all 
+   * processing to cease.
+   */
+  public void stop() throws IOException {
+    if(! stopRequested) {
+      stopRequested = true;
+ 
+      closeAllRegions();
+      log.close();
+      fs.close();
+      server.stop();
+    }
+  
+  }
+
+  /** Call join to wait for all the threads to finish */
+  public void join() {
+    try {
+      this.logRollerThread.join();
+      
+    } catch(InterruptedException iex) {
+    }
+    
+    try {
+      this.cacheFlusherThread.join();
+      
+    } catch(InterruptedException iex) {
+    }
+    
+    this.leases.close();
+  
+    try {
+      this.server.join();
+      
+    } catch(InterruptedException iex) {
+    }
+    
+  }
+  
+  /**
+   * The HRegionServer sticks in this loop until close. It repeatedly checks in 
+   * with the HMaster, sending heartbeats & reports, and receiving HRegion 
+   * load/unload instructions.
+   */
+  public void run() {
+    while(! stopRequested) {
+      HServerInfo info = new HServerInfo(address, rand.nextLong());
+      long lastMsg = 0;
+      long waitTime;
+
+      // Let the master know we're here
+      
+      try {
+        hbaseMaster.regionServerStartup(info);
+        
+      } catch(IOException e) {
+        waitTime = msgInterval - (System.currentTimeMillis() - lastMsg);
+
+        try {
+          Thread.sleep(waitTime);
+          
+        } catch(InterruptedException iex) {
+        }
+        continue;
+      }
+      
+      // Now ask the master what it wants us to do and tell it what we have done.
+      
+      while(! stopRequested) {
+        if((System.currentTimeMillis() - lastMsg) >= msgInterval) {
+
+          HMsg outboundArray[] = null;
+          synchronized(outboundMsgs) {
+            outboundArray = outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
+            outboundMsgs.clear();
+          }
+
+          try {
+            HMsg msgs[] = hbaseMaster.regionServerReport(info, outboundArray);
+            lastMsg = System.currentTimeMillis();
+
+            // Process the HMaster's instruction stream
+
+            if(! processMessages(msgs)) {
+              break;
+            }
+
+          } catch(IOException e) {
+            e.printStackTrace();
+          }
+        }
+
+        waitTime = msgInterval - (System.currentTimeMillis() - lastMsg);
+
+        try {
+          Thread.sleep(waitTime);
+        } catch(InterruptedException iex) {
+        }
+
+      }
+    }
+  }
+
+  private boolean processMessages(HMsg[] msgs) throws IOException {
+    for(int i = 0; i < msgs.length; i++) {
+      switch(msgs[i].getMsg()) {
+      
+      case HMsg.MSG_REGION_OPEN:                        // Open a region
+        openRegion(msgs[i].getRegionInfo());
+        break;
+        
+      case HMsg.MSG_REGION_CLOSE:                       // Close a region
+        closeRegion(msgs[i].getRegionInfo(), true);
+        break;
+        
+      case HMsg.MSG_REGION_MERGE:                       // Merge two regions
+        //TODO ???
+        throw new IOException("TODO: need to figure out merge");
+        //break;
+        
+      case HMsg.MSG_CALL_SERVER_STARTUP:                // Close regions, restart
+        closeAllRegions();
+        return false;
+        
+      case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING:       // Go away
+        stop();
+        return false;
+        
+      case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:        // Close a region, don't reply
+        closeRegion(msgs[i].getRegionInfo(), false);
+        break;
+        
+      case HMsg.MSG_REGION_CLOSE_AND_DELETE:
+        closeAndDeleteRegion(msgs[i].getRegionInfo());
+        break;
+        
+      default:
+        throw new IOException("Impossible state during msg processing.  Instruction: " + msgs[i]);
+      }
+    }
+    return true;
+  }
+
+  /** Add to the outbound message buffer */
+  private void reportOpen(HRegion region) {
+    synchronized(outboundMsgs) {
+      outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, region.getRegionInfo()));
+    }
+  }
+
+  /** Add to the outbound message buffer */
+  private void reportClose(HRegion region) {
+    synchronized(outboundMsgs) {
+      outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_CLOSE, region.getRegionInfo()));
+    }
+  }
+  
+  /** 
+   * Add to the outbound message buffer
+   * 
+   * When a region splits, we need to tell the master that there are two new 
+   * regions that need to be assigned.
+   * 
+   * We do not need to inform the master about the old region, because we've
+   * updated the meta or root regions, and the master will pick that up on its
+   * next rescan of the root or meta tables.
+   */
+  private void reportSplit(HRegionInfo newRegionA, HRegionInfo newRegionB) {
+    synchronized(outboundMsgs) {
+      outboundMsgs.add(new HMsg(HMsg.MSG_NEW_REGION, newRegionA));
+      outboundMsgs.add(new HMsg(HMsg.MSG_NEW_REGION, newRegionB));
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // HMaster-given operations
+  //////////////////////////////////////////////////////////////////////////////
+
+  private void openRegion(HRegionInfo regionInfo) throws IOException {
+    
+    locking.obtainWriteLock();
+    try {
+      HRegion region = new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile);
+      
+      regions.put(region.getRegionName(), region);
+      reportOpen(region);
+      
+    } finally {
+      locking.releaseWriteLock();
+    }
+  }
+
+  private void closeRegion(HRegionInfo info, boolean reportWhenCompleted)
+      throws IOException {
+    
+    locking.obtainWriteLock();
+    try {
+      HRegion region = regions.remove(info.regionName);
+      
+      if(region != null) {
+        region.close();
+        
+        if(reportWhenCompleted) {
+          reportClose(region);
+        }
+      }
+      
+    } finally {
+      locking.releaseWriteLock();
+    }
+  }
+
+  private void closeAndDeleteRegion(HRegionInfo info) throws IOException {
+
+    locking.obtainWriteLock();
+    try {
+      HRegion region = regions.remove(info.regionName);
+  
+      if(region != null) {
+        region.closeAndDelete();
+      }
+  
+    } finally {
+      locking.releaseWriteLock();
+    }
+  }
+
+  /** Called either when the master tells us to restart or from stop() */
+  private void closeAllRegions() throws IOException {
+    locking.obtainWriteLock();
+    try {
+      for(Iterator<HRegion> it = regions.values().iterator(); it.hasNext(); ) {
+        HRegion region = it.next();
+        region.close();
+      }
+      regions.clear();
+      
+    } finally {
+      locking.releaseWriteLock();
+    }
+  }
+
+  /*****************************************************************************
+   * TODO - Figure out how the master is to determine when regions should be
+   *        merged. It once it makes this determination, it needs to ensure that
+   *        the regions to be merged are first being served by the same
+   *        HRegionServer and if not, move them so they are.
+   *        
+   *        For now, we do not do merging. Splits are driven by the HRegionServer.
+   ****************************************************************************/
+/*
+  private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException {
+    locking.obtainWriteLock();
+    try {
+      HRegion srcA = regions.remove(regionNameA);
+      HRegion srcB = regions.remove(regionNameB);
+      HRegion newRegion = HRegion.closeAndMerge(srcA, srcB);
+      regions.put(newRegion.getRegionName(), newRegion);
+
+      reportClose(srcA);
+      reportClose(srcB);
+      reportOpen(newRegion);
+      
+    } finally {
+      locking.releaseWriteLock();
+    }
+  }
+*/
+
+  //////////////////////////////////////////////////////////////////////////////
+  // HRegionInterface
+  //////////////////////////////////////////////////////////////////////////////
+
+  /** Obtain a table descriptor for the given region */
+  public HRegionInfo getRegionInfo(Text regionName) {
+    HRegion region = getRegion(regionName);
+    if(region == null) {
+      return null;
+    }
+    return region.getRegionInfo();
+  }
+
+  /** Start a scanner for a given HRegion. */
+  public HScannerInterface openScanner(Text regionName, Text[] cols, 
+      Text firstRow) throws IOException {
+
+    HRegion r = getRegion(regionName);
+    if(r == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
+    return r.getScanner(cols, firstRow);
+  }
+  
+  /** Get the indicated row/column */
+  public BytesWritable get(Text regionName, Text row, Text column) throws IOException {
+    HRegion region = getRegion(regionName);
+    if(region == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
+    
+    byte results[] = region.get(row, column);
+    if(results != null) {
+      return new BytesWritable(results);
+    }
+    return null;
+  }
+
+  /** Get multiple versions of the indicated row/col */
+  public BytesWritable[] get(Text regionName, Text row, Text column, 
+      int numVersions) throws IOException {
+    
+    HRegion region = getRegion(regionName);
+    if(region == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
+    
+    byte results[][] = region.get(row, column, numVersions);
+    if(results != null) {
+      BytesWritable realResults[] = new BytesWritable[results.length];
+      for(int i = 0; i < realResults.length; i++) {
+        if(results[i] != null) {
+          realResults[i] = new BytesWritable(results[i]);
+        }
+      }
+      return realResults;
+    }
+    return null;
+  }
+
+  /** Get multiple timestamped versions of the indicated row/col */
+  public BytesWritable[] get(Text regionName, Text row, Text column, 
+      long timestamp, int numVersions) throws IOException {
+    
+    HRegion region = getRegion(regionName);
+    if(region == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
+    
+    byte results[][] = region.get(row, column, timestamp, numVersions);
+    if(results != null) {
+      BytesWritable realResults[] = new BytesWritable[results.length];
+      for(int i = 0; i < realResults.length; i++) {
+        if(results[i] != null) {
+          realResults[i] = new BytesWritable(results[i]);
+        }
+      }
+      return realResults;
+    }
+    return null;
+  }
+
+  /** Get all the columns (along with their names) for a given row. */
+  public LabelledData[] getRow(Text regionName, Text row) throws IOException {
+    HRegion region = getRegion(regionName);
+    if(region == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
+    
+    TreeMap<Text, byte[]> map = region.getFull(row);
+    LabelledData result[] = new LabelledData[map.size()];
+    int counter = 0;
+    for(Iterator<Text> it = map.keySet().iterator(); it.hasNext(); ) {
+      Text colname = it.next();
+      byte val[] = map.get(colname);
+      result[counter++] = new LabelledData(colname, val);
+    }
+    return result;
+  }
+
+  /**
+   * Start an update to the HBase.  This also creates a lease associated with
+   * the caller.
+   */
+  private class RegionListener extends LeaseListener {
+    private HRegion localRegion;
+    private long localLockId;
+    
+    public RegionListener(HRegion region, long lockId) {
+      this.localRegion = region;
+      this.localLockId = lockId;
+    }
+    
+    public void leaseExpired() {
+      try {
+        localRegion.abort(localLockId);
+        
+      } catch(IOException iex) {
+        iex.printStackTrace();
+      }
+    }
+  }
+  
+  public long startUpdate(Text regionName, long clientid, Text row) 
+      throws IOException {
+    
+    HRegion region = getRegion(regionName);
+    if(region == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
+    
+    long lockid = region.startUpdate(row);
+    leases.createLease(new Text(String.valueOf(clientid)), 
+        new Text(String.valueOf(lockid)), 
+        new RegionListener(region, lockid));
+    
+    return lockid;
+  }
+
+  /** Add something to the HBase. */
+  public void put(Text regionName, long clientid, long lockid, Text column, 
+      BytesWritable val) throws IOException {
+    
+    HRegion region = getRegion(regionName);
+    if(region == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
+    
+    leases.renewLease(new Text(String.valueOf(clientid)), 
+        new Text(String.valueOf(lockid)));
+    
+    region.put(lockid, column, val.get());
+  }
+
+  /** Remove a cell from the HBase. */
+  public void delete(Text regionName, long clientid, long lockid, Text column) 
+      throws IOException {
+    
+    HRegion region = getRegion(regionName);
+    if(region == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
+    
+    leases.renewLease(new Text(String.valueOf(clientid)), 
+        new Text(String.valueOf(lockid)));
+    
+    region.delete(lockid, column);
+  }
+
+  /** Abandon the transaction */
+  public void abort(Text regionName, long clientid, long lockid) 
+      throws IOException {
+    
+    HRegion region = getRegion(regionName);
+    if(region == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
+    
+    leases.cancelLease(new Text(String.valueOf(clientid)), 
+        new Text(String.valueOf(lockid)));
+    
+    region.abort(lockid);
+  }
+
+  /** Confirm the transaction */
+  public void commit(Text regionName, long clientid, long lockid) 
+      throws IOException {
+    
+    HRegion region = getRegion(regionName);
+    if(region == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
+    
+    leases.cancelLease(new Text(String.valueOf(clientid)), 
+        new Text(String.valueOf(lockid)));
+    
+    region.commit(lockid);
+  }
+
+  /** Don't let the client's lease expire just yet...  */
+  public void renewLease(long lockid, long clientid) throws IOException {
+    leases.renewLease(new Text(String.valueOf(clientid)), 
+        new Text(String.valueOf(lockid)));
+  }
+
+  /** Private utility method for safely obtaining an HRegion handle. */
+  private HRegion getRegion(Text regionName) {
+    locking.obtainReadLock();
+    try {
+      return regions.get(regionName);
+      
+    } finally {
+      locking.releaseReadLock();
+    }
+  }
+
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,29 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * HScannerInterface iterates through a set of rows.  It's implemented by several classes.
+ ******************************************************************************/
+public interface HScannerInterface {
+  public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException;
+  public void close() throws IOException;
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,103 @@
+/**
+ * Copyright 2006-7 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+
+/*******************************************************************************
+ * HServerAddress is a "label" for a HBase server that combines the host
+ * name and port number.
+ ******************************************************************************/
+public class HServerAddress implements Writable {
+  private InetSocketAddress address;
+  private String stringValue;
+
+  public HServerAddress() {
+    this.address = null;
+    this.stringValue = null;
+  }
+  
+  public HServerAddress(String hostAndPort) {
+    int colonIndex = hostAndPort.indexOf(':');
+    if(colonIndex < 0) {
+      throw new IllegalArgumentException("Not a host:port pair: " + hostAndPort);
+    }
+    String host = hostAndPort.substring(0, colonIndex);
+    int port = Integer.valueOf(hostAndPort.substring(colonIndex + 1));
+    this.address = new InetSocketAddress(host, port);
+    this.stringValue = new String(hostAndPort);
+  }
+  
+  public HServerAddress(String bindAddress, int port) {
+    this.address = new InetSocketAddress(bindAddress, port);
+    this.stringValue = new String(bindAddress + ":" + port);
+  }
+  
+  public HServerAddress(HServerAddress other) {
+    String bindAddress = other.getBindAddress();
+    int port = other.getPort();
+    address = new InetSocketAddress(bindAddress, port);
+    stringValue = new String(bindAddress + ":" + port);
+  }
+  
+  public String getBindAddress() {
+    return address.getAddress().getHostAddress();
+  }
+  
+  public int getPort() {
+    return address.getPort();
+  }
+  
+  public InetSocketAddress getInetSocketAddress() {
+    return address;
+  }
+  
+  public String toString() {
+    return (stringValue == null ? "" : stringValue);
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////////////////////////////////
+
+  public void readFields(DataInput in) throws IOException {
+    String bindAddress = in.readUTF();
+    int port = in.readInt();
+    
+    if(bindAddress == null || bindAddress.length() == 0) {
+      address = null;
+      stringValue = null;
+      
+    } else {
+      address = new InetSocketAddress(bindAddress, port);
+      stringValue = bindAddress + "_" + port;
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    if(address == null) {
+      out.writeUTF("");
+      out.writeInt(0);
+      
+    } else {
+      out.writeUTF(address.getAddress().getHostAddress());
+      out.writeInt(address.getPort());
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerInfo.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerInfo.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerInfo.java Tue Apr  3 13:34:28 2007
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2006-7 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+
+/*******************************************************************************
+ * HRSInfo contains metainfo about an HRegionServer, including details about the
+ * source machine and load statistics.
+ ******************************************************************************/
+public class HServerInfo implements Writable {
+  private HServerAddress serverAddress;
+  private long startCode;
+
+  public HServerInfo() {
+    this.serverAddress = new HServerAddress();
+    this.startCode = 0;
+  }
+  
+  public HServerInfo(HServerAddress serverAddress, long startCode) {
+    this.serverAddress = new HServerAddress(serverAddress);
+    this.startCode = startCode;
+  }
+  
+  public HServerInfo(HServerInfo other) {
+    this.serverAddress = new HServerAddress(other.getServerAddress());
+    this.startCode = other.getStartCode();
+  }
+  
+  public HServerAddress getServerAddress() {
+    return serverAddress;
+  }
+  
+  public long getStartCode() {
+    return startCode;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////////////////////////////////
+
+  public void readFields(DataInput in) throws IOException {
+    this.serverAddress.readFields(in);
+    this.startCode = in.readLong();
+  }
+
+  public void write(DataOutput out) throws IOException {
+    this.serverAddress.write(out);
+    out.writeLong(this.startCode);
+  }
+}



Mime
View raw message