hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r532083 [3/4] - in /lucene/hadoop/trunk: ./ src/contrib/hbase/conf/ src/contrib/hbase/src/java/org/apache/hadoop/hbase/ src/contrib/hbase/src/test/org/apache/hadoop/hbase/
Date Tue, 24 Apr 2007 21:13:10 GMT
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Tue Apr 24 14:13:08 2007
@@ -61,21 +61,21 @@
     // Make sure that srcA comes first; important for key-ordering during
     // write of the merged file.
     
-    if (srcA.getStartKey() == null) {
-      if (srcB.getStartKey() == null) {
+    if(srcA.getStartKey() == null) {
+      if(srcB.getStartKey() == null) {
         throw new IOException("Cannot merge two regions with null start key");
       }
       // A's start key is null but B's isn't. Assume A comes before B
       
-    } else if ((srcB.getStartKey() == null)         // A is not null but B is
-               || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B
+    } else if((srcB.getStartKey() == null)         // A is not null but B is
+        || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B
       
       HRegion tmp = srcA;
       srcA = srcB;
       srcB = tmp;
     }
     
-    if (!srcA.getEndKey().equals(srcB.getStartKey())) {
+    if (! srcA.getEndKey().equals(srcB.getStartKey())) {
       throw new IOException("Cannot merge non-adjacent regions");
     }
 
@@ -89,7 +89,7 @@
     Text endKey = srcB.getEndKey();
 
     Path merges = new Path(srcA.getRegionDir(), MERGEDIR);
-    if (!fs.exists(merges)) {
+    if(! fs.exists(merges)) {
       fs.mkdirs(merges);
     }
     
@@ -98,14 +98,14 @@
     
     Path newRegionDir = HStoreFile.getHRegionDir(merges, newRegionInfo.regionName);
 
-    if (fs.exists(newRegionDir)) {
+    if(fs.exists(newRegionDir)) {
       throw new IOException("Cannot merge; target file collision at " + newRegionDir);
     }
 
     LOG.info("starting merge of regions: " + srcA.getRegionName() + " and " 
-             + srcB.getRegionName() + " new region start key is '" 
-             + (startKey == null ? "" : startKey) + "', end key is '" 
-             + (endKey == null ? "" : endKey) + "'");
+        + srcB.getRegionName() + " new region start key is '" 
+        + (startKey == null ? "" : startKey) + "', end key is '" 
+        + (endKey == null ? "" : endKey) + "'");
     
     // Flush each of the sources, and merge their files into a single 
     // target for each column family.
@@ -114,10 +114,10 @@
     
     TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
     TreeMap<Text, Vector<HStoreFile>> filesToMerge = new TreeMap<Text, Vector<HStoreFile>>();
-    for(Iterator<HStoreFile> it = srcA.flushcache(true).iterator(); it.hasNext();) {
+    for(Iterator<HStoreFile> it = srcA.flushcache(true).iterator(); it.hasNext(); ) {
       HStoreFile src = it.next();
       Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
-      if (v == null) {
+      if(v == null) {
         v = new Vector<HStoreFile>();
         filesToMerge.put(src.getColFamily(), v);
       }
@@ -126,10 +126,10 @@
     
     LOG.debug("flushing and getting file names for region " + srcB.getRegionName());
     
-    for(Iterator<HStoreFile> it = srcB.flushcache(true).iterator(); it.hasNext();) {
+    for(Iterator<HStoreFile> it = srcB.flushcache(true).iterator(); it.hasNext(); ) {
       HStoreFile src = it.next();
       Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
-      if (v == null) {
+      if(v == null) {
         v = new Vector<HStoreFile>();
         filesToMerge.put(src.getColFamily(), v);
       }
@@ -138,11 +138,11 @@
     
     LOG.debug("merging stores");
     
-    for(Iterator<Text> it = filesToMerge.keySet().iterator(); it.hasNext();) {
+    for(Iterator<Text> it = filesToMerge.keySet().iterator(); it.hasNext(); ) {
       Text colFamily = it.next();
       Vector<HStoreFile> srcFiles = filesToMerge.get(colFamily);
       HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName, 
-                                      colFamily, Math.abs(rand.nextLong()));
+          colFamily, Math.abs(rand.nextLong()));
       
       dst.mergeStoreFiles(srcFiles, fs, conf);
       alreadyMerged.addAll(srcFiles);
@@ -153,15 +153,15 @@
     // of any last-minute inserts
 
     LOG.debug("flushing changes since start of merge for region " 
-              + srcA.getRegionName());
+        + srcA.getRegionName());
 
     filesToMerge.clear();
-    for(Iterator<HStoreFile> it = srcA.close().iterator(); it.hasNext();) {
+    for(Iterator<HStoreFile> it = srcA.close().iterator(); it.hasNext(); ) {
       HStoreFile src = it.next();
       
-      if (!alreadyMerged.contains(src)) {
+      if(! alreadyMerged.contains(src)) {
         Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
-        if (v == null) {
+        if(v == null) {
           v = new Vector<HStoreFile>();
           filesToMerge.put(src.getColFamily(), v);
         }
@@ -170,14 +170,14 @@
     }
     
     LOG.debug("flushing changes since start of merge for region " 
-              + srcB.getRegionName());
+        + srcB.getRegionName());
     
-    for(Iterator<HStoreFile> it = srcB.close().iterator(); it.hasNext();) {
+    for(Iterator<HStoreFile> it = srcB.close().iterator(); it.hasNext(); ) {
       HStoreFile src = it.next();
       
-      if (!alreadyMerged.contains(src)) {
+      if(! alreadyMerged.contains(src)) {
         Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
-        if (v == null) {
+        if(v == null) {
           v = new Vector<HStoreFile>();
           filesToMerge.put(src.getColFamily(), v);
         }
@@ -187,11 +187,11 @@
     
     LOG.debug("merging changes since start of merge");
     
-    for(Iterator<Text> it = filesToMerge.keySet().iterator(); it.hasNext();) {
+    for(Iterator<Text> it = filesToMerge.keySet().iterator(); it.hasNext(); ) {
       Text colFamily = it.next();
       Vector<HStoreFile> srcFiles = filesToMerge.get(colFamily);
       HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
-                                      colFamily, Math.abs(rand.nextLong()));
+          colFamily, Math.abs(rand.nextLong()));
       
       dst.mergeStoreFiles(srcFiles, fs, conf);
     }
@@ -199,7 +199,7 @@
     // Done
     
     HRegion dstRegion = new HRegion(dir, log, fs, conf, newRegionInfo,
-                                    newRegionDir, null);
+        newRegionDir, null);
 
     // Get rid of merges directory
     
@@ -284,7 +284,7 @@
    * written-to before), then read it from the supplied path.
    */
   public HRegion(Path dir, HLog log, FileSystem fs, Configuration conf, 
-                 HRegionInfo regionInfo, Path initialFiles, Path oldLogFile) throws IOException {
+      HRegionInfo regionInfo, Path initialFiles, Path oldLogFile) throws IOException {
     
     this.dir = dir;
     this.log = log;
@@ -303,29 +303,29 @@
 
     // Move prefab HStore files into place (if any)
     
-    if (initialFiles != null && fs.exists(initialFiles)) {
+    if(initialFiles != null && fs.exists(initialFiles)) {
       fs.rename(initialFiles, regiondir);
     }
 
     // Load in all the HStores.
     
     for(Iterator<Text> it = this.regionInfo.tableDesc.families().iterator();
-        it.hasNext();) {
+        it.hasNext(); ) {
       
-      Text colFamily = it.next();
+      Text colFamily = HStoreKey.extractFamily(it.next());
       stores.put(colFamily, new HStore(dir, this.regionInfo.regionName, colFamily, 
-                                       this.regionInfo.tableDesc.getMaxVersions(), fs, oldLogFile, conf));
+          this.regionInfo.tableDesc.getMaxVersions(), fs, oldLogFile, conf));
     }
 
     // Get rid of any splits or merges that were lost in-progress
     
     Path splits = new Path(regiondir, SPLITDIR);
-    if (fs.exists(splits)) {
+    if(fs.exists(splits)) {
       fs.delete(splits);
     }
     
     Path merges = new Path(regiondir, MERGEDIR);
-    if (fs.exists(merges)) {
+    if(fs.exists(merges)) {
       fs.delete(merges);
     }
 
@@ -345,6 +345,7 @@
   
   /** Closes and deletes this HRegion. Called when doing a table deletion, for example */
   public void closeAndDelete() throws IOException {
+    LOG.info("deleting region: " + regionInfo.regionName);
     close();
     fs.delete(regiondir);
   }
@@ -362,7 +363,7 @@
   public Vector<HStoreFile> close() throws IOException {
     boolean shouldClose = false;
     synchronized(writestate) {
-      if (writestate.closed) {
+      if(writestate.closed) {
         LOG.info("region " + this.regionInfo.regionName + " closed");
         return new Vector<HStoreFile>();
       }
@@ -376,13 +377,13 @@
       shouldClose = true;
     }
 
-    if (!shouldClose) {
+    if(! shouldClose) {
       return null;
       
     } else {
       LOG.info("closing region " + this.regionInfo.regionName);
       Vector<HStoreFile> allHStoreFiles = internalFlushcache();
-      for(Iterator<HStore> it = stores.values().iterator(); it.hasNext();) {
+      for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
         HStore store = it.next();
         store.close();
       }
@@ -406,8 +407,8 @@
    * Returns two brand-new (and open) HRegions
    */
   public HRegion[] closeAndSplit(Text midKey) throws IOException {
-    if (((regionInfo.startKey.getLength() != 0)
-         && (regionInfo.startKey.compareTo(midKey) > 0))
+    if(((regionInfo.startKey.getLength() != 0)
+        && (regionInfo.startKey.compareTo(midKey) > 0))
         || ((regionInfo.endKey.getLength() != 0)
             && (regionInfo.endKey.compareTo(midKey) < 0))) {
       throw new IOException("Region splitkey must lie within region boundaries.");
@@ -419,13 +420,13 @@
     // or compactions until close() is called.
     
     Path splits = new Path(regiondir, SPLITDIR);
-    if (!fs.exists(splits)) {
+    if(! fs.exists(splits)) {
       fs.mkdirs(splits);
     }
     
     long regionAId = Math.abs(rand.nextLong());
     HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc, 
-                                              regionInfo.startKey, midKey);
+        regionInfo.startKey, midKey);
         
     long regionBId = Math.abs(rand.nextLong());
     HRegionInfo regionBInfo
@@ -434,24 +435,24 @@
     Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName);
     Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName);
 
-    if (fs.exists(dirA) || fs.exists(dirB)) {
+    if(fs.exists(dirA) || fs.exists(dirB)) {
       throw new IOException("Cannot split; target file collision at " + dirA 
-                            + " or " + dirB);
+          + " or " + dirB);
     }
     
     TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
     Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
-    for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext();) {
+    for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
       HStoreFile hsf = it.next();
       
       LOG.debug("splitting HStore " + hsf.getRegionName() + "/" + hsf.getColFamily()
-                + "/" + hsf.fileId());
+          + "/" + hsf.fileId());
 
       HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName, 
-                                       hsf.getColFamily(), Math.abs(rand.nextLong()));
+          hsf.getColFamily(), Math.abs(rand.nextLong()));
       
       HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName, 
-                                       hsf.getColFamily(), Math.abs(rand.nextLong()));
+          hsf.getColFamily(), Math.abs(rand.nextLong()));
       
       hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
       alreadySplit.add(hsf);
@@ -461,18 +462,18 @@
     // and copy the small remainder
     
     hstoreFilesToSplit = close();
-    for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext();) {
+    for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
       HStoreFile hsf = it.next();
       
-      if (!alreadySplit.contains(hsf)) {
+      if(! alreadySplit.contains(hsf)) {
         LOG.debug("splitting HStore " + hsf.getRegionName() + "/" + hsf.getColFamily()
-                  + "/" + hsf.fileId());
+            + "/" + hsf.fileId());
 
         HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName, 
-                                         hsf.getColFamily(), Math.abs(rand.nextLong()));
+            hsf.getColFamily(), Math.abs(rand.nextLong()));
         
         HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName, 
-                                         hsf.getColFamily(), Math.abs(rand.nextLong()));
+            hsf.getColFamily(), Math.abs(rand.nextLong()));
         
         hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
       }
@@ -494,7 +495,7 @@
     regions[1] = regionB;
     
     LOG.info("region split complete. new regions are: " + regions[0].getRegionName()
-             + ", " + regions[1].getRegionName());
+        + ", " + regions[1].getRegionName());
     
     return regions;
   }
@@ -565,10 +566,10 @@
     Text key = new Text();
     long maxSize = 0;
 
-    for(Iterator<HStore> i = stores.values().iterator(); i.hasNext();) {
+    for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
       long size = i.next().getLargestFileSize(key);
       
-      if (size > maxSize) {                      // Largest so far
+      if(size > maxSize) {                      // Largest so far
         maxSize = size;
         midKey.set(key);
       }
@@ -593,9 +594,9 @@
   public boolean compactStores() throws IOException {
     boolean shouldCompact = false;
     synchronized(writestate) {
-      if ((!writestate.writesOngoing)
+      if((! writestate.writesOngoing)
           && writestate.writesEnabled
-          && (!writestate.closed)
+          && (! writestate.closed)
           && recentCommits > MIN_COMMITS_FOR_COMPACTION) {
         
         writestate.writesOngoing = true;
@@ -603,14 +604,14 @@
       }
     }
 
-    if (!shouldCompact) {
+    if(! shouldCompact) {
       LOG.info("not compacting region " + this.regionInfo.regionName);
       return false;
       
     } else {
       try {
         LOG.info("starting compaction on region " + this.regionInfo.regionName);
-        for(Iterator<HStore> it = stores.values().iterator(); it.hasNext();) {
+        for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
           HStore store = it.next();
           store.compact();
         }
@@ -632,7 +633,7 @@
    * only take if there have been a lot of uncommitted writes.
    */
   public void optionallyFlush() throws IOException {
-    if (commitsSinceFlush > maxUnflushedEntries) {
+    if(commitsSinceFlush > maxUnflushedEntries) {
       flushcache(false);
     }
   }
@@ -657,20 +658,20 @@
   public Vector<HStoreFile> flushcache(boolean disableFutureWrites) throws IOException {
     boolean shouldFlush = false;
     synchronized(writestate) {
-      if ((!writestate.writesOngoing)
+      if((! writestate.writesOngoing)
           && writestate.writesEnabled
-          && (!writestate.closed)) {
+          && (! writestate.closed)) {
         
         writestate.writesOngoing = true;
         shouldFlush = true;
         
-        if (disableFutureWrites) {
+        if(disableFutureWrites) {
           writestate.writesEnabled = false;
         }
       }
     }
     
-    if (!shouldFlush) {
+    if(! shouldFlush) {
       LOG.debug("not flushing cache for region " + this.regionInfo.regionName);
       return null;
       
@@ -731,8 +732,8 @@
     
     HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
     TreeMap<HStoreKey, BytesWritable> memcacheSnapshot = retval.memcacheSnapshot;
-    if (memcacheSnapshot == null) {
-      for(Iterator<HStore> it = stores.values().iterator(); it.hasNext();) {
+    if(memcacheSnapshot == null) {
+      for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
         HStore hstore = it.next();
         Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
         allHStoreFiles.addAll(0, hstoreFiles);
@@ -746,7 +747,7 @@
     
     LOG.debug("flushing memcache to HStores");
     
-    for(Iterator<HStore> it = stores.values().iterator(); it.hasNext();) {
+    for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
       HStore hstore = it.next();
       Vector<HStoreFile> hstoreFiles 
         = hstore.flushCache(memcacheSnapshot, logCacheFlushId);
@@ -762,7 +763,7 @@
     LOG.debug("writing flush cache complete to log");
     
     log.completeCacheFlush(this.regionInfo.regionName,
-                           regionInfo.tableDesc.getName(), logCacheFlushId);
+        regionInfo.tableDesc.getName(), logCacheFlushId);
 
     // C. Delete the now-irrelevant memcache snapshot; its contents have been 
     //    dumped to disk-based HStores.
@@ -784,7 +785,7 @@
   /** Fetch a single data item. */
   public byte[] get(Text row, Text column) throws IOException {
     byte results[][] = get(row, column, Long.MAX_VALUE, 1);
-    if (results == null) {
+    if(results == null) {
       return null;
       
     } else {
@@ -799,17 +800,16 @@
 
   /** Fetch multiple versions of a single data item, with timestamp. */
   public byte[][] get(Text row, Text column, long timestamp, int numVersions) 
-    throws IOException {
+      throws IOException {
     
-    if (writestate.closed) {
+    if(writestate.closed) {
       throw new IOException("HRegion is closed.");
     }
 
     // Make sure this is a valid row and valid column
 
     checkRow(row);
-    Text colFamily = HStoreKey.extractFamily(column);
-    checkFamily(colFamily);
+    checkColumn(column);
 
     // Obtain the row-lock
 
@@ -830,7 +830,7 @@
     // Check the memcache
 
     byte[][] result = memcache.get(key, numVersions);
-    if (result != null) {
+    if(result != null) {
       return result;
     }
 
@@ -838,7 +838,7 @@
 
     Text colFamily = HStoreKey.extractFamily(key.getColumn());
     HStore targetStore = stores.get(colFamily);
-    if (targetStore == null) {
+    if(targetStore == null) {
       return null;
     }
     
@@ -859,7 +859,7 @@
     HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
 
     TreeMap<Text, byte[]> memResult = memcache.getFull(key);
-    for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext();) {
+    for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
       Text colFamily = it.next();
       HStore targetStore = stores.get(colFamily);
       targetStore.getFull(key, memResult);
@@ -879,7 +879,7 @@
 
     HStore storelist[] = new HStore[families.size()];
     int i = 0;
-    for(Iterator<Text> it = families.iterator(); it.hasNext();) {
+    for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
       Text family = it.next();
       storelist[i++] = stores.get(family);
     }
@@ -911,23 +911,23 @@
   /**
    * Put a cell value into the locked row.  The user indicates the row-lock, the
    * target column, and the desired value.  This stuff is set into a temporary 
-   * memory area until the user commits the change, at which pointit's logged 
+   * memory area until the user commits the change, at which point it's logged 
    * and placed into the memcache.
    *
    * This method really just tests the input, then calls an internal localput() 
    * method.
    */
   public void put(long lockid, Text targetCol, byte[] val) throws IOException {
-    if (val.length == HStoreKey.DELETE_BYTES.length) {
+    if(val.length == HStoreKey.DELETE_BYTES.length) {
       boolean matches = true;
       for(int i = 0; i < val.length; i++) {
-        if (val[i] != HStoreKey.DELETE_BYTES[i]) {
+        if(val[i] != HStoreKey.DELETE_BYTES[i]) {
           matches = false;
           break;
         }
       }
       
-      if (matches) {
+      if(matches) {
         throw new IOException("Cannot insert value: " + val);
       }
     }
@@ -950,9 +950,11 @@
    * (Or until the user's write-lock expires.)
    */
   void localput(long lockid, Text targetCol, byte[] val) throws IOException {
+    checkColumn(targetCol);
+    
     Text row = getRowFromLock(lockid);
-    if (row == null) {
-      throw new IOException("No write lock for lockid " + lockid);
+    if(row == null) {
+      throw new LockException("No write lock for lockid " + lockid);
     }
 
     // This sync block makes localput() thread-safe when multiple
@@ -964,13 +966,13 @@
       // This check makes sure that another thread from the client
       // hasn't aborted/committed the write-operation.
 
-      if (row != getRowFromLock(lockid)) {
-        throw new IOException("Locking error: put operation on lock " + lockid 
-                              + " unexpected aborted by another thread");
+      if(row != getRowFromLock(lockid)) {
+        throw new LockException("Locking error: put operation on lock " + lockid 
+            + " unexpected aborted by another thread");
       }
       
       TreeMap<Text, byte[]> targets = targetColumns.get(lockid);
-      if (targets == null) {
+      if(targets == null) {
         targets = new TreeMap<Text, byte[]>();
         targetColumns.put(lockid, targets);
       }
@@ -985,8 +987,8 @@
    */
   public void abort(long lockid) throws IOException {
     Text row = getRowFromLock(lockid);
-    if (row == null) {
-      throw new IOException("No write lock for lockid " + lockid);
+    if(row == null) {
+      throw new LockException("No write lock for lockid " + lockid);
     }
     
     // This sync block makes abort() thread-safe when multiple
@@ -998,9 +1000,9 @@
       // This check makes sure another thread from the client
       // hasn't aborted/committed the write-operation.
       
-      if (row != getRowFromLock(lockid)) {
-        throw new IOException("Locking error: abort() operation on lock " 
-                              + lockid + " unexpected aborted by another thread");
+      if(row != getRowFromLock(lockid)) {
+        throw new LockException("Locking error: abort() operation on lock " 
+            + lockid + " unexpected aborted by another thread");
       }
       
       targetColumns.remove(lockid);
@@ -1021,8 +1023,8 @@
     // that repeated executions won't screw this up.
     
     Text row = getRowFromLock(lockid);
-    if (row == null) {
-      throw new IOException("No write lock for lockid " + lockid);
+    if(row == null) {
+      throw new LockException("No write lock for lockid " + lockid);
     }
     
     // This check makes sure that another thread from the client
@@ -1035,7 +1037,7 @@
 
       long commitTimestamp = System.currentTimeMillis();
       log.append(regionInfo.regionName, regionInfo.tableDesc.getName(), row, 
-                 targetColumns.get(lockid), commitTimestamp);
+          targetColumns.get(lockid), commitTimestamp);
       
       memcache.add(row, targetColumns.get(lockid), commitTimestamp);
 
@@ -1054,25 +1056,26 @@
 
   /** Make sure this is a valid row for the HRegion */
   void checkRow(Text row) throws IOException {
-    if (((regionInfo.startKey.getLength() == 0)
-         || (regionInfo.startKey.compareTo(row) <= 0))
+    if(((regionInfo.startKey.getLength() == 0)
+        || (regionInfo.startKey.compareTo(row) <= 0))
         && ((regionInfo.endKey.getLength() == 0)
             || (regionInfo.endKey.compareTo(row) > 0))) {
       // all's well
       
     } else {
       throw new IOException("Requested row out of range for HRegion "
-                            + regionInfo.regionName + ", startKey='" + regionInfo.startKey
-                            + "', endKey='" + regionInfo.endKey + "', row='" + row + "'");
+          + regionInfo.regionName + ", startKey='" + regionInfo.startKey
+          + "', endKey='" + regionInfo.endKey + "', row='" + row + "'");
     }
   }
-
+  
   /** Make sure this is a valid column for the current table */
-  void checkFamily(Text family) throws IOException {
-    if (!regionInfo.tableDesc.hasFamily(family)) {
+  void checkColumn(Text columnName) throws IOException {
+    Text family = new Text(HStoreKey.extractFamily(columnName) + ":");
+    if(! regionInfo.tableDesc.hasFamily(family)) {
       throw new IOException("Requested column family " + family 
-                            + " does not exist in HRegion " + regionInfo.regionName
-                            + " for table " + regionInfo.tableDesc.getName());
+          + " does not exist in HRegion " + regionInfo.regionName
+          + " for table " + regionInfo.tableDesc.getName());
     }
   }
 
@@ -1092,6 +1095,8 @@
    * which maybe we'll do in the future.
    */
   long obtainLock(Text row) throws IOException {
+    checkRow(row);
+    
     synchronized(rowsToLocks) {
       while(rowsToLocks.get(row) != null) {
         try {
@@ -1109,6 +1114,8 @@
   }
   
   Text getRowFromLock(long lockid) throws IOException {
+    // Pattern is that all access to rowsToLocks and/or to
+    // locksToRows is via a lock on rowsToLocks.
     synchronized(rowsToLocks) {
       return locksToRows.get(lockid);
     }
@@ -1150,7 +1157,7 @@
         keys[i] = new HStoreKey();
         resultSets[i] = new TreeMap<Text, byte[]>();
 
-        if (!scanners[i].next(keys[i], resultSets[i])) {
+        if(! scanners[i].next(keys[i], resultSets[i])) {
           closeScanner(i);
         }
       }
@@ -1167,7 +1174,7 @@
       Text chosenRow = null;
       long chosenTimestamp = -1;
       for(int i = 0; i < keys.length; i++) {
-        if (scanners[i] != null
+        if(scanners[i] != null
             && (chosenRow == null
                 || (keys[i].getRow().compareTo(chosenRow) < 0)
                 || ((keys[i].getRow().compareTo(chosenRow) == 0)
@@ -1181,21 +1188,21 @@
       // Store the key and results for each sub-scanner. Merge them as appropriate.
       
       boolean insertedItem = false;
-      if (chosenTimestamp > 0) {
+      if(chosenTimestamp > 0) {
         key.setRow(chosenRow);
         key.setVersion(chosenTimestamp);
         key.setColumn(new Text(""));
 
         for(int i = 0; i < scanners.length; i++) {        
           while((scanners[i] != null)
-                && (keys[i].getRow().compareTo(chosenRow) == 0)
-                && (keys[i].getTimestamp() == chosenTimestamp)) {
+              && (keys[i].getRow().compareTo(chosenRow) == 0)
+              && (keys[i].getTimestamp() == chosenTimestamp)) {
             
             results.putAll(resultSets[i]);
             insertedItem = true;
 
             resultSets[i].clear();
-            if (!scanners[i].next(keys[i], resultSets[i])) {
+            if(! scanners[i].next(keys[i], resultSets[i])) {
               closeScanner(i);
             }
           }
@@ -1204,10 +1211,10 @@
           // row label, then its timestamp is bad.  We need to advance it.
 
           while((scanners[i] != null)
-                && (keys[i].getRow().compareTo(chosenRow) <= 0)) {
+              && (keys[i].getRow().compareTo(chosenRow) <= 0)) {
             
             resultSets[i].clear();
-            if (!scanners[i].next(keys[i], resultSets[i])) {
+            if(! scanners[i].next(keys[i], resultSets[i])) {
               closeScanner(i);
             }
           }
@@ -1231,7 +1238,7 @@
     /** All done with the scanner. */
     public void close() throws IOException {
       for(int i = 0; i < scanners.length; i++) {
-        if (scanners[i] != null) {
+        if(scanners[i] != null) {
           closeScanner(i);
         }
       }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java Tue Apr 24 14:13:08 2007
@@ -38,28 +38,28 @@
   }
 
   public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey, 
-                     Text endKey) throws IllegalArgumentException {
+      Text endKey) throws IllegalArgumentException {
     
     this.regionId = regionId;
     
-    if (tableDesc == null) {
+    if(tableDesc == null) {
       throw new IllegalArgumentException("tableDesc cannot be null");
     }
     
     this.tableDesc = tableDesc;
     
     this.startKey = new Text();
-    if (startKey != null) {
+    if(startKey != null) {
       this.startKey.set(startKey);
     }
     
     this.endKey = new Text();
-    if (endKey != null) {
+    if(endKey != null) {
       this.endKey.set(endKey);
     }
     
     this.regionName = new Text(tableDesc.getName() + "_"
-                               + (startKey == null ? "" : startKey.toString()) + "_" + regionId);
+        + (startKey == null ? "" : startKey.toString()) + "_" + regionId);
   }
     
   //////////////////////////////////////////////////////////////////////////////

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java Tue Apr 24 14:13:08 2007
@@ -15,7 +15,9 @@
  */
 package org.apache.hadoop.hbase;
 
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
 
 import java.io.*;
 
@@ -23,17 +25,13 @@
  * Clients interact with HRegionServers using
  * a handle to the HRegionInterface.
  ******************************************************************************/
-public interface HRegionInterface {
+public interface HRegionInterface extends VersionedProtocol {
   public static final long versionID = 1L; // initial version
 
   // Get metainfo about an HRegion
 
   public HRegionInfo getRegionInfo(Text regionName);
 
-  // Start a scanner for a given HRegion.
-
-  public HScannerInterface openScanner(Text regionName, Text[] columns, Text startRow) throws IOException;
-
   // GET methods for an HRegion.
 
   public BytesWritable get(Text regionName, Text row, Text column) throws IOException;
@@ -58,4 +56,41 @@
   public void abort(Text regionName, long clientid, long lockid) throws IOException;
   public void commit(Text regionName, long clientid, long lockid) throws IOException;
   public void renewLease(long lockid, long clientid) throws IOException;
+
+  //////////////////////////////////////////////////////////////////////////////
+  // remote scanner interface
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Opens a remote scanner.
+   * 
+   * @param clientId    - client identifier (so we can associate a scanner with a client)
+   * @param regionName  - name of region to scan
+   * @param columns     - columns to scan
+   * @param startRow    - starting row to scan
+   *
+   * @param scannerId   - scanner identifier used in other calls
+   * @throws IOException
+   */
+  public long openScanner(Text regionName, Text[] columns, Text startRow) throws IOException;
+
+  /**
+   * Get the next set of values
+   * 
+   * @param scannerId   - clientId passed to openScanner
+   * @param key         - the next HStoreKey
+   * @param columns     - an array of column names
+   * @param values      - an array of byte[] values (corresponds 1-1 with columns)
+   * @return            - true if a value was retrieved
+   * @throws IOException
+   */
+  public LabelledData[] next(long scannerId, HStoreKey key) throws IOException;
+  
+  /**
+   * Close a scanner
+   * 
+   * @param scannerId   - the scanner id returned by openScanner
+   * @throws IOException
+   */
+  public void close(long scannerId) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Tue Apr 24 14:13:08 2007
@@ -15,6 +15,8 @@
  */
 package org.apache.hadoop.hbase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.ipc.*;
@@ -22,19 +24,34 @@
 
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /*******************************************************************************
  * HRegionServer makes a set of HRegions available to clients.  It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  ******************************************************************************/
-public class HRegionServer implements HConstants, HRegionInterface, Runnable {
+public class HRegionServer
+    implements HConstants, HRegionInterface, Runnable {
+  
+  public long getProtocolVersion(String protocol, 
+      long clientVersion) throws IOException { 
+    if (protocol.equals(HRegionInterface.class.getName())) {
+      return HRegionInterface.versionID;
+    } else {
+      throw new IOException("Unknown protocol to name node: " + protocol);
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(HRegionServer.class);
+  
   private boolean stopRequested;
   private Path regionDir;
   private HServerAddress address;
   private Configuration conf;
   private Random rand;
   private TreeMap<Text, HRegion> regions;               // region name -> HRegion
-  private HLocking locking;
+  private ReadWriteLock locker;
   private Vector<HMsg> outboundMsgs;
 
   private long threadWakeFrequency;
@@ -61,29 +78,29 @@
     }
     
     public void run() {
-      while(!stopRequested) {
+      while(! stopRequested) {
         long startTime = System.currentTimeMillis();
 
         // Grab a list of regions to check
 
         Vector<HRegion> checkSplit = new Vector<HRegion>();
-        locking.obtainReadLock();
+        locker.readLock().lock();
         try {
           checkSplit.addAll(regions.values());
           
         } finally {
-          locking.releaseReadLock();
+          locker.readLock().unlock();
         }
 
         // Check to see if they need splitting
 
         Vector<SplitRegion> toSplit = new Vector<SplitRegion>();
-        for(Iterator<HRegion> it = checkSplit.iterator(); it.hasNext();) {
+        for(Iterator<HRegion> it = checkSplit.iterator(); it.hasNext(); ) {
           HRegion cur = it.next();
           Text midKey = new Text();
           
           try {
-            if (cur.needsSplit(midKey)) {
+            if(cur.needsSplit(midKey)) {
               toSplit.add(new SplitRegion(cur, midKey));
             }
             
@@ -92,17 +109,19 @@
           }
         }
 
-        for(Iterator<SplitRegion> it = toSplit.iterator(); it.hasNext();) {
+        for(Iterator<SplitRegion> it = toSplit.iterator(); it.hasNext(); ) {
           SplitRegion r = it.next();
           
-          locking.obtainWriteLock();
+          locker.writeLock().lock();
           regions.remove(r.region.getRegionName());
-          locking.releaseWriteLock();
+          locker.writeLock().unlock();
  
           HRegion[] newRegions = null;
           try {
             Text oldRegion = r.region.getRegionName();
             
+            LOG.info("splitting region: " + oldRegion);
+            
             newRegions = r.region.closeAndSplit(r.midKey);
 
             // When a region is split, the META table needs to updated if we're
@@ -111,8 +130,10 @@
 
             Text tableToUpdate
               = (oldRegion.find(META_TABLE_NAME.toString()) == 0)
-              ? ROOT_TABLE_NAME : META_TABLE_NAME;
+                ? ROOT_TABLE_NAME : META_TABLE_NAME;
 
+            LOG.debug("region split complete. updating meta");
+            
             client.openTable(tableToUpdate);
             long lockid = client.startUpdate(oldRegion);
             client.delete(lockid, META_COL_REGIONINFO);
@@ -132,7 +153,14 @@
             
             // Now tell the master about the new regions
             
+            LOG.debug("reporting region split to master");
+            
             reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
+            
+            LOG.info("region split successful. old region=" + oldRegion
+                + ", new regions: " + newRegions[0].getRegionName() + ", "
+                + newRegions[1].getRegionName());
+            
             newRegions[0].close();
             newRegions[1].close();
             
@@ -145,11 +173,15 @@
         
         // Sleep
 
-        long endTime = System.currentTimeMillis();
-        try {
-          Thread.sleep(splitCheckFrequency - (endTime - startTime));
-          
-        } catch(InterruptedException iex) {
+        long waitTime =
+          splitCheckFrequency - (System.currentTimeMillis() - startTime);
+        
+        if(waitTime > 0) {
+          try {
+            Thread.sleep(waitTime);
+
+          } catch(InterruptedException iex) {
+          }
         }
       }
     }
@@ -161,23 +193,23 @@
   private Thread cacheFlusherThread;
   private class Flusher implements Runnable {
     public void run() {
-      while(!stopRequested) {
+      while(! stopRequested) {
         long startTime = System.currentTimeMillis();
 
         // Grab a list of items to flush
 
         Vector<HRegion> toFlush = new Vector<HRegion>();
-        locking.obtainReadLock();
+        locker.readLock().lock();
         try {
           toFlush.addAll(regions.values());
           
         } finally {
-          locking.releaseReadLock();
+          locker.readLock().unlock();
         }
 
         // Flush them, if necessary
 
-        for(Iterator<HRegion> it = toFlush.iterator(); it.hasNext();) {
+        for(Iterator<HRegion> it = toFlush.iterator(); it.hasNext(); ) {
           HRegion cur = it.next();
           
           try {
@@ -190,11 +222,15 @@
 
         // Sleep
 
-        long endTime = System.currentTimeMillis();
-        try {
-          Thread.sleep(threadWakeFrequency - (endTime - startTime));
-          
-        } catch(InterruptedException iex) {
+        long waitTime =
+          threadWakeFrequency - (System.currentTimeMillis() - startTime);
+        
+        if(waitTime > 0) {
+          try {
+            Thread.sleep(waitTime);
+
+          } catch(InterruptedException iex) {
+          }
         }
       }
     }
@@ -212,12 +248,12 @@
   private Thread logRollerThread;
   private class LogRoller implements Runnable {
     public void run() {
-      while(!stopRequested) {
+      while(! stopRequested) {
 
         // If the number of log entries is high enough, roll the log.  This is a
         // very fast operation, but should not be done too frequently.
 
-        if (log.getNumEntries() > maxLogEntries) {
+        if(log.getNumEntries() > maxLogEntries) {
           try {
             log.rollWriter();
             
@@ -249,24 +285,24 @@
   /** Start a HRegionServer at the default location */
   public HRegionServer(Configuration conf) throws IOException {
     this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)),
-         new HServerAddress(conf.get("hbase.regionserver.default.name")),
-         conf);
+        new HServerAddress(conf.get(REGIONSERVER_ADDRESS, "localhost:0")),
+        conf);
   }
   
   /** Start a HRegionServer at an indicated location */
   public HRegionServer(Path regionDir, HServerAddress address, Configuration conf) 
-    throws IOException {
+      throws IOException {
     
     // Basic setup
     
     this.stopRequested = false;
     this.regionDir = regionDir;
-    this.address = address;
     this.conf = conf;
     this.rand = new Random();
     this.regions = new TreeMap<Text, HRegion>();
-    this.locking = new HLocking(); 
+    this.locker = new ReentrantReadWriteLock();
     this.outboundMsgs = new Vector<HMsg>();
+    this.scanners = Collections.synchronizedMap(new TreeMap<Text, HScannerInterface>());
 
     // Config'ed params
     
@@ -278,53 +314,69 @@
     // Cache flushing
     
     this.cacheFlusher = new Flusher();
-    this.cacheFlusherThread = new Thread(cacheFlusher);
+    this.cacheFlusherThread = new Thread(cacheFlusher, "HRegionServer.cacheFlusher");
     
     // Check regions to see if they need to be split
     
     this.splitChecker = new SplitChecker();
-    this.splitCheckerThread = new Thread(splitChecker);
+    this.splitCheckerThread = new Thread(splitChecker, "HRegionServer.splitChecker");
+    
+    // Process requests from Master
+    
+    this.toDo = new Vector<HMsg>();
+    this.worker = new Worker();
+    this.workerThread = new Thread(worker, "HRegionServer.worker");
 
     try {
+      
+      // Server to handle client requests
+      
+      this.server = RPC.getServer(this, address.getBindAddress().toString(), 
+          address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
+
+      this.address = new HServerAddress(server.getListenerAddress());
+
       // Local file paths
 
-      this.fs = FileSystem.get(conf);
-      Path newlogdir = new Path(regionDir, "log" + "_" + address.toString());
-      this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + address.toString());
+      String serverName = this.address.getBindAddress() + "_" + this.address.getPort();
+      Path newlogdir = new Path(regionDir, "log" + "_" + serverName);
+      this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + serverName);
 
       // Logging
 
+      this.fs = FileSystem.get(conf);
       HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf);
       this.log = new HLog(fs, newlogdir, conf);
       this.logRoller = new LogRoller();
-      this.logRollerThread = new Thread(logRoller);
+      this.logRollerThread = new Thread(logRoller, "HRegionServer.logRoller");
 
       // Remote HMaster
 
       this.hbaseMaster = (HMasterRegionInterface)
-        RPC.waitForProxy(HMasterRegionInterface.class,
-                         HMasterRegionInterface.versionId,
-                         new HServerAddress(conf.get(MASTER_DEFAULT_NAME)).getInetSocketAddress(),
-                         conf);
+      RPC.waitForProxy(HMasterRegionInterface.class,
+          HMasterRegionInterface.versionID,
+          new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
+          conf);
 
       // Threads
 
+      this.workerThread.start();
       this.cacheFlusherThread.start();
       this.splitCheckerThread.start();
       this.logRollerThread.start();
       this.leases = new Leases(conf.getLong("hbase.hregionserver.lease.period", 
-                                            3 * 60 * 1000), threadWakeFrequency);
+          3 * 60 * 1000), threadWakeFrequency);
       
       // Server
 
-      this.server = RPC.getServer(this, address.getBindAddress().toString(), 
-                                  address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
       this.server.start();
 
     } catch(IOException e) {
       this.stopRequested = true;
       throw e;
     }
+    
+    LOG.info("HRegionServer started at: " + address.toString());
   }
 
   /**
@@ -334,7 +386,7 @@
    * processing to cease.
    */
   public void stop() throws IOException {
-    if (!stopRequested) {
+    if(! stopRequested) {
       stopRequested = true;
  
       closeAllRegions();
@@ -342,12 +394,18 @@
       fs.close();
       server.stop();
     }
-  
+    LOG.info("stopping server at: " + address.toString());
   }
 
   /** Call join to wait for all the threads to finish */
   public void join() {
     try {
+      this.workerThread.join();
+      
+    } catch(InterruptedException iex) {
+    }
+
+    try {
       this.logRollerThread.join();
       
     } catch(InterruptedException iex) {
@@ -366,7 +424,7 @@
       
     } catch(InterruptedException iex) {
     }
-    
+    LOG.info("server stopped at: " + address.toString());
   }
   
   /**
@@ -375,7 +433,7 @@
    * load/unload instructions.
    */
   public void run() {
-    while(!stopRequested) {
+    while(! stopRequested) {
       HServerInfo info = new HServerInfo(address, rand.nextLong());
       long lastMsg = 0;
       long waitTime;
@@ -388,18 +446,20 @@
       } catch(IOException e) {
         waitTime = msgInterval - (System.currentTimeMillis() - lastMsg);
 
-        try {
-          Thread.sleep(waitTime);
-          
-        } catch(InterruptedException iex) {
+        if(waitTime > 0) {
+          try {
+            Thread.sleep(waitTime);
+
+          } catch(InterruptedException iex) {
+          }
         }
         continue;
       }
       
       // Now ask the master what it wants us to do and tell it what we have done.
       
-      while(!stopRequested) {
-        if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
+      while(! stopRequested) {
+        if((System.currentTimeMillis() - lastMsg) >= msgInterval) {
 
           HMsg outboundArray[] = null;
           synchronized(outboundMsgs) {
@@ -411,10 +471,33 @@
             HMsg msgs[] = hbaseMaster.regionServerReport(info, outboundArray);
             lastMsg = System.currentTimeMillis();
 
-            // Process the HMaster's instruction stream
+            // Queue up the HMaster's instruction stream for processing
 
-            if (!processMessages(msgs)) {
-              break;
+            synchronized(toDo) {
+              boolean restartOrStop = false;
+              for(int i = 0; i < msgs.length; i++) {
+                switch(msgs[i].getMsg()) {
+                
+                case HMsg.MSG_CALL_SERVER_STARTUP:
+                  closeAllRegions();
+                  restartOrStop = true;
+                  break;
+                
+                case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING:
+                  stop();
+                  restartOrStop = true;
+                  break;
+                  
+                default:
+                  toDo.add(msgs[i]);
+                }
+              }
+              if(toDo.size() > 0) {
+                toDo.notifyAll();
+              }
+              if(restartOrStop) {
+                break;
+              }
             }
 
           } catch(IOException e) {
@@ -424,55 +507,16 @@
 
         waitTime = msgInterval - (System.currentTimeMillis() - lastMsg);
 
-        try {
-          Thread.sleep(waitTime);
-        } catch(InterruptedException iex) {
+        if(waitTime > 0) {
+          try {
+            Thread.sleep(waitTime);
+          } catch(InterruptedException iex) {
+          }
         }
-
       }
     }
   }
 
-  private boolean processMessages(HMsg[] msgs) throws IOException {
-    for(int i = 0; i < msgs.length; i++) {
-      switch(msgs[i].getMsg()) {
-      
-      case HMsg.MSG_REGION_OPEN:                        // Open a region
-        openRegion(msgs[i].getRegionInfo());
-        break;
-        
-      case HMsg.MSG_REGION_CLOSE:                       // Close a region
-        closeRegion(msgs[i].getRegionInfo(), true);
-        break;
-        
-      case HMsg.MSG_REGION_MERGE:                       // Merge two regions
-        //TODO ???
-        throw new IOException("TODO: need to figure out merge");
-        //break;
-        
-      case HMsg.MSG_CALL_SERVER_STARTUP:                // Close regions, restart
-        closeAllRegions();
-        return false;
-        
-      case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING:       // Go away
-        stop();
-        return false;
-        
-      case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:        // Close a region, don't reply
-        closeRegion(msgs[i].getRegionInfo(), false);
-        break;
-        
-      case HMsg.MSG_REGION_CLOSE_AND_DELETE:
-        closeAndDeleteRegion(msgs[i].getRegionInfo());
-        break;
-        
-      default:
-        throw new IOException("Impossible state during msg processing.  Instruction: " + msgs[i]);
-      }
-    }
-    return true;
-  }
-
   /** Add to the outbound message buffer */
   private void reportOpen(HRegion region) {
     synchronized(outboundMsgs) {
@@ -508,9 +552,68 @@
   // HMaster-given operations
   //////////////////////////////////////////////////////////////////////////////
 
+  private Vector<HMsg> toDo;
+  private Worker worker;
+  private Thread workerThread;
+  private class Worker implements Runnable {
+    public void run() {
+      while(!stopRequested) {
+        HMsg msg = null;
+        synchronized(toDo) {
+          while(toDo.size() == 0) {
+            try {
+              toDo.wait();
+              
+            } catch(InterruptedException e) {
+            }
+          }
+          msg = toDo.remove(0);
+        }
+        try {
+          switch(msg.getMsg()) {
+
+          case HMsg.MSG_REGION_OPEN:                    // Open a region
+            openRegion(msg.getRegionInfo());
+            break;
+
+          case HMsg.MSG_REGION_CLOSE:                   // Close a region
+            closeRegion(msg.getRegionInfo(), true);
+            break;
+
+          case HMsg.MSG_REGION_MERGE:                   // Merge two regions
+            //TODO ???
+            throw new IOException("TODO: need to figure out merge");
+            //break;
+
+          case HMsg.MSG_CALL_SERVER_STARTUP:            // Close regions, restart
+            closeAllRegions();
+            continue;
+
+          case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING:   // Go away
+            stop();
+            continue;
+
+          case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:    // Close a region, don't reply
+            closeRegion(msg.getRegionInfo(), false);
+            break;
+
+          case HMsg.MSG_REGION_CLOSE_AND_DELETE:
+            closeAndDeleteRegion(msg.getRegionInfo());
+            break;
+
+          default:
+            throw new IOException("Impossible state during msg processing.  Instruction: " + msg);
+          }
+        } catch(IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+  
   private void openRegion(HRegionInfo regionInfo) throws IOException {
     
-    locking.obtainWriteLock();
+    this.locker.writeLock().lock();
     try {
       HRegion region = new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile);
       
@@ -518,57 +621,57 @@
       reportOpen(region);
       
     } finally {
-      locking.releaseWriteLock();
+      this.locker.writeLock().unlock();
     }
   }
 
   private void closeRegion(HRegionInfo info, boolean reportWhenCompleted)
-    throws IOException {
+      throws IOException {
     
-    locking.obtainWriteLock();
+    this.locker.writeLock().lock();
     try {
       HRegion region = regions.remove(info.regionName);
       
-      if (region != null) {
+      if(region != null) {
         region.close();
         
-        if (reportWhenCompleted) {
+        if(reportWhenCompleted) {
           reportClose(region);
         }
       }
       
     } finally {
-      locking.releaseWriteLock();
+      this.locker.writeLock().unlock();
     }
   }
 
   private void closeAndDeleteRegion(HRegionInfo info) throws IOException {
 
-    locking.obtainWriteLock();
+    this.locker.writeLock().lock();
     try {
       HRegion region = regions.remove(info.regionName);
   
-      if (region != null) {
+      if(region != null) {
         region.closeAndDelete();
       }
   
     } finally {
-      locking.releaseWriteLock();
+      this.locker.writeLock().unlock();
     }
   }
 
   /** Called either when the master tells us to restart or from stop() */
   private void closeAllRegions() throws IOException {
-    locking.obtainWriteLock();
+    this.locker.writeLock().lock();
     try {
-      for(Iterator<HRegion> it = regions.values().iterator(); it.hasNext();) {
+      for(Iterator<HRegion> it = regions.values().iterator(); it.hasNext(); ) {
         HRegion region = it.next();
         region.close();
       }
       regions.clear();
       
     } finally {
-      locking.releaseWriteLock();
+      this.locker.writeLock().unlock();
     }
   }
 
@@ -580,24 +683,24 @@
    *        
    *        For now, we do not do merging. Splits are driven by the HRegionServer.
    ****************************************************************************/
-  /*
-    private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException {
+/*
+  private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException {
     locking.obtainWriteLock();
     try {
-    HRegion srcA = regions.remove(regionNameA);
-    HRegion srcB = regions.remove(regionNameB);
-    HRegion newRegion = HRegion.closeAndMerge(srcA, srcB);
-    regions.put(newRegion.getRegionName(), newRegion);
-
-    reportClose(srcA);
-    reportClose(srcB);
-    reportOpen(newRegion);
+      HRegion srcA = regions.remove(regionNameA);
+      HRegion srcB = regions.remove(regionNameB);
+      HRegion newRegion = HRegion.closeAndMerge(srcA, srcB);
+      regions.put(newRegion.getRegionName(), newRegion);
+
+      reportClose(srcA);
+      reportClose(srcB);
+      reportOpen(newRegion);
       
     } finally {
-    locking.releaseWriteLock();
-    }
+      locking.releaseWriteLock();
     }
-  */
+  }
+*/
 
   //////////////////////////////////////////////////////////////////////////////
   // HRegionInterface
@@ -606,32 +709,21 @@
   /** Obtain a table descriptor for the given region */
   public HRegionInfo getRegionInfo(Text regionName) {
     HRegion region = getRegion(regionName);
-    if (region == null) {
+    if(region == null) {
       return null;
     }
     return region.getRegionInfo();
   }
 
-  /** Start a scanner for a given HRegion. */
-  public HScannerInterface openScanner(Text regionName, Text[] cols, 
-                                       Text firstRow) throws IOException {
-
-    HRegion r = getRegion(regionName);
-    if (r == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
-    return r.getScanner(cols, firstRow);
-  }
-  
   /** Get the indicated row/column */
   public BytesWritable get(Text regionName, Text row, Text column) throws IOException {
     HRegion region = getRegion(regionName);
-    if (region == null) {
+    if(region == null) {
       throw new IOException("Not serving region " + regionName);
     }
     
     byte results[] = region.get(row, column);
-    if (results != null) {
+    if(results != null) {
       return new BytesWritable(results);
     }
     return null;
@@ -639,18 +731,18 @@
 
   /** Get multiple versions of the indicated row/col */
   public BytesWritable[] get(Text regionName, Text row, Text column, 
-                             int numVersions) throws IOException {
+      int numVersions) throws IOException {
     
     HRegion region = getRegion(regionName);
-    if (region == null) {
+    if(region == null) {
       throw new IOException("Not serving region " + regionName);
     }
     
     byte results[][] = region.get(row, column, numVersions);
-    if (results != null) {
+    if(results != null) {
       BytesWritable realResults[] = new BytesWritable[results.length];
       for(int i = 0; i < realResults.length; i++) {
-        if (results[i] != null) {
+        if(results[i] != null) {
           realResults[i] = new BytesWritable(results[i]);
         }
       }
@@ -661,18 +753,18 @@
 
   /** Get multiple timestamped versions of the indicated row/col */
   public BytesWritable[] get(Text regionName, Text row, Text column, 
-                             long timestamp, int numVersions) throws IOException {
+      long timestamp, int numVersions) throws IOException {
     
     HRegion region = getRegion(regionName);
-    if (region == null) {
+    if(region == null) {
       throw new IOException("Not serving region " + regionName);
     }
     
     byte results[][] = region.get(row, column, timestamp, numVersions);
-    if (results != null) {
+    if(results != null) {
       BytesWritable realResults[] = new BytesWritable[results.length];
       for(int i = 0; i < realResults.length; i++) {
-        if (results[i] != null) {
+        if(results[i] != null) {
           realResults[i] = new BytesWritable(results[i]);
         }
       }
@@ -684,14 +776,14 @@
   /** Get all the columns (along with their names) for a given row. */
   public LabelledData[] getRow(Text regionName, Text row) throws IOException {
     HRegion region = getRegion(regionName);
-    if (region == null) {
+    if(region == null) {
       throw new IOException("Not serving region " + regionName);
     }
     
     TreeMap<Text, byte[]> map = region.getFull(row);
     LabelledData result[] = new LabelledData[map.size()];
     int counter = 0;
-    for(Iterator<Text> it = map.keySet().iterator(); it.hasNext();) {
+    for(Iterator<Text> it = map.keySet().iterator(); it.hasNext(); ) {
       Text colname = it.next();
       byte val[] = map.get(colname);
       result[counter++] = new LabelledData(colname, val);
@@ -723,77 +815,77 @@
   }
   
   public long startUpdate(Text regionName, long clientid, Text row) 
-    throws IOException {
+      throws IOException {
     
     HRegion region = getRegion(regionName);
-    if (region == null) {
+    if(region == null) {
       throw new IOException("Not serving region " + regionName);
     }
     
     long lockid = region.startUpdate(row);
     leases.createLease(new Text(String.valueOf(clientid)), 
-                       new Text(String.valueOf(lockid)), 
-                       new RegionListener(region, lockid));
+        new Text(String.valueOf(lockid)), 
+        new RegionListener(region, lockid));
     
     return lockid;
   }
 
   /** Add something to the HBase. */
   public void put(Text regionName, long clientid, long lockid, Text column, 
-                  BytesWritable val) throws IOException {
+      BytesWritable val) throws IOException {
     
     HRegion region = getRegion(regionName);
-    if (region == null) {
+    if(region == null) {
       throw new IOException("Not serving region " + regionName);
     }
     
     leases.renewLease(new Text(String.valueOf(clientid)), 
-                      new Text(String.valueOf(lockid)));
+        new Text(String.valueOf(lockid)));
     
     region.put(lockid, column, val.get());
   }
 
   /** Remove a cell from the HBase. */
   public void delete(Text regionName, long clientid, long lockid, Text column) 
-    throws IOException {
+      throws IOException {
     
     HRegion region = getRegion(regionName);
-    if (region == null) {
+    if(region == null) {
       throw new IOException("Not serving region " + regionName);
     }
     
     leases.renewLease(new Text(String.valueOf(clientid)), 
-                      new Text(String.valueOf(lockid)));
+        new Text(String.valueOf(lockid)));
     
     region.delete(lockid, column);
   }
 
   /** Abandon the transaction */
   public void abort(Text regionName, long clientid, long lockid) 
-    throws IOException {
+      throws IOException {
     
     HRegion region = getRegion(regionName);
-    if (region == null) {
+    if(region == null) {
       throw new IOException("Not serving region " + regionName);
     }
     
     leases.cancelLease(new Text(String.valueOf(clientid)), 
-                       new Text(String.valueOf(lockid)));
+        new Text(String.valueOf(lockid)));
     
     region.abort(lockid);
   }
 
   /** Confirm the transaction */
   public void commit(Text regionName, long clientid, long lockid) 
-    throws IOException {
+      throws IOException {
     
     HRegion region = getRegion(regionName);
-    if (region == null) {
+    if(region == null) {
       throw new IOException("Not serving region " + regionName);
     }
     
     leases.cancelLease(new Text(String.valueOf(clientid)), 
-                       new Text(String.valueOf(lockid)));
+        new Text(String.valueOf(lockid)));
     
     region.commit(lockid);
   }
@@ -801,18 +893,131 @@
   /** Don't let the client's lease expire just yet...  */
   public void renewLease(long lockid, long clientid) throws IOException {
     leases.renewLease(new Text(String.valueOf(clientid)), 
-                      new Text(String.valueOf(lockid)));
+        new Text(String.valueOf(lockid)));
   }
 
   /** Private utility method for safely obtaining an HRegion handle. */
   private HRegion getRegion(Text regionName) {
-    locking.obtainReadLock();
+    this.locker.readLock().lock();
     try {
       return regions.get(regionName);
       
     } finally {
-      locking.releaseReadLock();
+      this.locker.readLock().unlock();
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // remote scanner interface
+  //////////////////////////////////////////////////////////////////////////////
+
+  private Map<Text, HScannerInterface> scanners;
+  private class ScannerListener extends LeaseListener {
+    private Text scannerName;
+    
+    public ScannerListener(Text scannerName) {
+      this.scannerName = scannerName;
+    }
+    
+    public void leaseExpired() {
+      HScannerInterface s = scanners.remove(scannerName);
+      if(s != null) {
+        try {
+          s.close();
+          
+        } catch(IOException e) {
+          e.printStackTrace();
+        }
+      }
     }
   }
+  
+  /** Start a scanner for a given HRegion. */
+  public long openScanner(Text regionName, Text[] cols, Text firstRow)
+      throws IOException {
+
+    HRegion r = getRegion(regionName);
+    if(r == null) {
+      throw new IOException("Not serving region " + regionName);
+    }
 
+    long scannerId = -1L;
+    try {
+      HScannerInterface s = r.getScanner(cols, firstRow);
+      scannerId = rand.nextLong();
+      Text scannerName = new Text(String.valueOf(scannerId));
+      scanners.put(scannerName, s);
+      leases.createLease(scannerName, scannerName, new ScannerListener(scannerName));
+    
+    } catch(IOException e) {
+      e.printStackTrace();
+      throw e;
+    }
+    return scannerId;
+  }
+  
+  public LabelledData[] next(long scannerId, HStoreKey key) throws IOException {
+    
+    Text scannerName = new Text(String.valueOf(scannerId));
+    HScannerInterface s = scanners.get(scannerName);
+    if(s == null) {
+      throw new IOException("unknown scanner");
+    }
+    leases.renewLease(scannerName, scannerName);
+    TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+    ArrayList<LabelledData> values = new ArrayList<LabelledData>();
+    if(s.next(key, results)) {
+      for(Iterator<Map.Entry<Text, byte[]>> it = results.entrySet().iterator();
+          it.hasNext(); ) {
+        Map.Entry<Text, byte[]> e = it.next();
+        values.add(new LabelledData(e.getKey(), e.getValue()));
+      }
+    }
+    return values.toArray(new LabelledData[values.size()]);
+  }
+  
+  public void close(long scannerId) throws IOException {
+    Text scannerName = new Text(String.valueOf(scannerId));
+    HScannerInterface s = scanners.remove(scannerName);
+    if(s == null) {
+      throw new IOException("unknown scanner");
+    }
+    try {
+      s.close();
+        
+    } catch(IOException ex) {
+      ex.printStackTrace();
+    }
+    leases.cancelLease(scannerName, scannerName);
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Main program
+  //////////////////////////////////////////////////////////////////////////////
+
+  private static void printUsage() {
+    System.err.println("Usage: java " +
+        "org.apache.hbase.HRegionServer [--bind=hostname:port]");
+  }
+  
+  public static void main(String [] args) throws IOException {
+    Configuration conf = new HBaseConfiguration();
+    
+    // Process command-line args. TODO: Better cmd-line processing
+    // (but hopefully something not as painful as cli options).
+    for (String cmd: args) {
+      if (cmd.equals("-h") || cmd.startsWith("--h")) {
+        printUsage();
+        return;
+      }
+      
+      final String addressArgKey = "--bind=";
+      if (cmd.startsWith(addressArgKey)) {
+        conf.set(REGIONSERVER_ADDRESS,
+            cmd.substring(addressArgKey.length()));
+      }
+    }
+    
+    new HRegionServer(conf);
+  }
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java Tue Apr 24 14:13:08 2007
@@ -33,9 +33,15 @@
     this.stringValue = null;
   }
   
+  public HServerAddress(InetSocketAddress address) {
+    this.address = address;
+    this.stringValue = new String(address.getAddress().getHostAddress()
+        + ":" + address.getPort());
+  }
+  
   public HServerAddress(String hostAndPort) {
     int colonIndex = hostAndPort.indexOf(':');
-    if (colonIndex < 0) {
+    if(colonIndex < 0) {
       throw new IllegalArgumentException("Not a host:port pair: " + hostAndPort);
     }
     String host = hostAndPort.substring(0, colonIndex);
@@ -80,7 +86,7 @@
     String bindAddress = in.readUTF();
     int port = in.readInt();
     
-    if (bindAddress == null || bindAddress.length() == 0) {
+    if(bindAddress == null || bindAddress.length() == 0) {
       address = null;
       stringValue = null;
       
@@ -91,7 +97,7 @@
   }
 
   public void write(DataOutput out) throws IOException {
-    if (address == null) {
+    if(address == null) {
       out.writeUTF("");
       out.writeInt(0);
       

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Tue Apr 24 14:13:08 2007
@@ -15,14 +15,26 @@
  */
 package org.apache.hadoop.hbase;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
-
-import java.io.*;
-import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 
 /*******************************************************************************
  * HStore maintains a bunch of data files.  It is responsible for maintaining 
@@ -53,7 +65,7 @@
   Integer compactLock = new Integer(0);
   Integer flushLock = new Integer(0);
 
-  HLocking locking = new HLocking();
+  ReadWriteLock locker = new ReentrantReadWriteLock();
 
   TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
   TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
@@ -88,7 +100,7 @@
    * will be deleted (by whoever has instantiated the HStore).
    */
   public HStore(Path dir, Text regionName, Text colFamily, int maxVersions, 
-                FileSystem fs, Path reconstructionLog, Configuration conf) throws IOException {
+      FileSystem fs, Path reconstructionLog, Configuration conf) throws IOException {
     
     this.dir = dir;
     this.regionName = regionName;
@@ -110,7 +122,7 @@
 
     this.compactdir = new Path(dir, COMPACTION_DIR);
     Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
-    if (fs.exists(curCompactStore)) {
+    if(fs.exists(curCompactStore)) {
       processReadyCompaction();
       fs.delete(curCompactStore);
     }
@@ -123,7 +135,7 @@
     Vector<HStoreFile> hstoreFiles 
       = HStoreFile.loadHStoreFiles(conf, dir, regionName, colFamily, fs);
     
-    for(Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext();) {
+    for(Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext(); ) {
       HStoreFile hsf = it.next();
       mapFiles.put(hsf.loadInfo(fs), hsf);
     }
@@ -138,11 +150,11 @@
     // contain any updates also contained in the log.
 
     long maxSeqID = -1;
-    for(Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext();) {
+    for(Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext(); ) {
       HStoreFile hsf = it.next();
       long seqid = hsf.loadInfo(fs);
-      if (seqid > 0) {
-        if (seqid > maxSeqID) {
+      if(seqid > 0) {
+        if(seqid > maxSeqID) {
           maxSeqID = seqid;
         }
       }
@@ -157,7 +169,7 @@
 
     LOG.debug("reading reconstructionLog");
     
-    if (reconstructionLog != null && fs.exists(reconstructionLog)) {
+    if(reconstructionLog != null && fs.exists(reconstructionLog)) {
       long maxSeqIdInLog = -1;
       TreeMap<HStoreKey, BytesWritable> reconstructedCache 
         = new TreeMap<HStoreKey, BytesWritable>();
@@ -170,18 +182,18 @@
         HLogEdit val = new HLogEdit();
         while(login.next(key, val)) {
           maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
-          if (key.getLogSeqNum() <= maxSeqID) {
+          if(key.getLogSeqNum() <= maxSeqID) {
             continue;
           }
           reconstructedCache.put(new HStoreKey(key.getRow(), val.getColumn(), 
-                                               val.getTimestamp()), val.getVal());
+              val.getTimestamp()), val.getVal());
         }
         
       } finally {
         login.close();
       }
 
-      if (reconstructedCache.size() > 0) {
+      if(reconstructedCache.size() > 0) {
         
         // We create a "virtual flush" at maxSeqIdInLog+1.
         
@@ -195,7 +207,7 @@
     // should be "timeless"; that is, it should not have an associated seq-ID, 
     // because all log messages have been reflected in the TreeMaps at this point.
     
-    if (mapFiles.size() >= 1) {
+    if(mapFiles.size() >= 1) {
       compactHelper(true);
     }
 
@@ -204,7 +216,7 @@
 
     LOG.debug("starting map readers");
     
-    for(Iterator<Long> it = mapFiles.keySet().iterator(); it.hasNext();) {
+    for(Iterator<Long> it = mapFiles.keySet().iterator(); it.hasNext(); ) {
       Long key = it.next().longValue();
       HStoreFile hsf = mapFiles.get(key);
 
@@ -218,11 +230,11 @@
 
   /** Turn off all the MapFile readers */
   public void close() throws IOException {
-    locking.obtainWriteLock();
+    this.locker.writeLock().lock();
     LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily);
     
     try {
-      for(Iterator<MapFile.Reader> it = maps.values().iterator(); it.hasNext();) {
+      for(Iterator<MapFile.Reader> it = maps.values().iterator(); it.hasNext(); ) {
         MapFile.Reader map = it.next();
         map.close();
       }
@@ -232,7 +244,7 @@
       LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily);
       
     } finally {
-      locking.releaseWriteLock();
+      this.locker.writeLock().unlock();
     }
   }
 
@@ -252,13 +264,13 @@
    * Return the entire list of HStoreFiles currently used by the HStore.
    */
   public Vector<HStoreFile> flushCache(TreeMap<HStoreKey, BytesWritable> inputCache,
-                                       long logCacheFlushId) throws IOException {
+      long logCacheFlushId) throws IOException {
     
     return flushCacheHelper(inputCache, logCacheFlushId, true);
   }
   
   Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, BytesWritable> inputCache,
-                                      long logCacheFlushId, boolean addToAvailableMaps) throws IOException {
+      long logCacheFlushId, boolean addToAvailableMaps) throws IOException {
     
     synchronized(flushLock) {
       LOG.debug("flushing HStore " + this.regionName + "/" + this.colFamily);
@@ -270,12 +282,12 @@
       
       Path mapfile = flushedFile.getMapFilePath();
       MapFile.Writer out = new MapFile.Writer(conf, fs, mapfile.toString(), 
-                                              HStoreKey.class, BytesWritable.class);
+          HStoreKey.class, BytesWritable.class);
       
       try {
-        for(Iterator<HStoreKey> it = inputCache.keySet().iterator(); it.hasNext();) {
+        for(Iterator<HStoreKey> it = inputCache.keySet().iterator(); it.hasNext(); ) {
           HStoreKey curkey = it.next();
-          if (this.colFamily.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
+          if(this.colFamily.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
             BytesWritable val = inputCache.get(curkey);
             out.append(curkey, val);
           }
@@ -294,8 +306,8 @@
 
       // C. Finally, make the new MapFile available.
 
-      if (addToAvailableMaps) {
-        locking.obtainWriteLock();
+      if(addToAvailableMaps) {
+        this.locker.writeLock().lock();
         
         try {
           maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf));
@@ -303,7 +315,7 @@
           LOG.debug("HStore available for " + this.regionName + "/" + this.colFamily);
         
         } finally {
-          locking.releaseWriteLock();
+          this.locker.writeLock().unlock();
         }
       }
       return getAllMapFiles();
@@ -312,7 +324,7 @@
 
   public Vector<HStoreFile> getAllMapFiles() {
     Vector<HStoreFile> flushedFiles = new Vector<HStoreFile>();
-    for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext();) {
+    for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
       HStoreFile hsf = it.next();
       flushedFiles.add(hsf);
     }
@@ -355,22 +367,22 @@
         // Grab a list of files to compact.
         
         Vector<HStoreFile> toCompactFiles = null;
-        locking.obtainWriteLock();
+        this.locker.writeLock().lock();
         try {
           toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
           
         } finally {
-          locking.releaseWriteLock();
+          this.locker.writeLock().unlock();
         }
 
         // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
 
         long maxSeenSeqID = -1;
-        for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext();) {
+        for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
           HStoreFile hsf = it.next();
           long seqid = hsf.loadInfo(fs);
-          if (seqid > 0) {
-            if (seqid > maxSeenSeqID) {
+          if(seqid > 0) {
+            if(seqid > maxSeenSeqID) {
               maxSeenSeqID = seqid;
             }
           }
@@ -380,11 +392,11 @@
         HStoreFile compactedOutputFile 
           = new HStoreFile(conf, compactdir, regionName, colFamily, -1);
         
-        if (toCompactFiles.size() == 1) {
+        if(toCompactFiles.size() == 1) {
           LOG.debug("nothing to compact for " + this.regionName + "/" + this.colFamily);
           
           HStoreFile hsf = toCompactFiles.elementAt(0);
-          if (hsf.loadInfo(fs) == -1) {
+          if(hsf.loadInfo(fs) == -1) {
             return;
           }
         }
@@ -392,8 +404,8 @@
         // Step through them, writing to the brand-new TreeMap
 
         MapFile.Writer compactedOut = new MapFile.Writer(conf, fs, 
-                                                         compactedOutputFile.getMapFilePath().toString(), HStoreKey.class, 
-                                                         BytesWritable.class);
+            compactedOutputFile.getMapFilePath().toString(), HStoreKey.class, 
+            BytesWritable.class);
         
         try {
 
@@ -414,7 +426,7 @@
           BytesWritable[] vals = new BytesWritable[toCompactFiles.size()];
           boolean[] done = new boolean[toCompactFiles.size()];
           int pos = 0;
-          for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext();) {
+          for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
             HStoreFile hsf = it.next();
             readers[pos] = new MapFile.Reader(fs, hsf.getMapFilePath().toString(), conf);
             keys[pos] = new HStoreKey();
@@ -431,8 +443,8 @@
           int numDone = 0;
           for(int i = 0; i < readers.length; i++) {
             readers[i].reset();
-            done[i] = !readers[i].next(keys[i], vals[i]);
-            if (done[i]) {
+            done[i] = ! readers[i].next(keys[i], vals[i]);
+            if(done[i]) {
               numDone++;
             }
           }
@@ -446,15 +458,15 @@
 
             int smallestKey = -1;
             for(int i = 0; i < readers.length; i++) {
-              if (done[i]) {
+              if(done[i]) {
                 continue;
               }
               
-              if (smallestKey < 0) {
+              if(smallestKey < 0) {
                 smallestKey = i;
               
               } else {
-                if (keys[i].compareTo(keys[smallestKey]) < 0) {
+                if(keys[i].compareTo(keys[smallestKey]) < 0) {
                   smallestKey = i;
                 }
               }
@@ -463,7 +475,7 @@
             // Reflect the current key/val in the output
 
             HStoreKey sk = keys[smallestKey];
-            if (lastRow.equals(sk.getRow())
+            if(lastRow.equals(sk.getRow())
                 && lastColumn.equals(sk.getColumn())) {
               
               timesSeen++;
@@ -472,12 +484,12 @@
               timesSeen = 1;
             }
             
-            if (timesSeen <= maxVersions) {
+            if(timesSeen <= maxVersions) {
 
               // Keep old versions until we have maxVersions worth.
               // Then just skip them.
 
-              if (sk.getRow().getLength() != 0
+              if(sk.getRow().getLength() != 0
                   && sk.getColumn().getLength() != 0) {
                 
                 // Only write out objects which have a non-zero length key and value
@@ -499,7 +511,7 @@
             // Advance the smallest key.  If that reader's all finished, then 
             // mark it as done.
 
-            if (!readers[smallestKey].next(keys[smallestKey], vals[smallestKey])) {
+            if(! readers[smallestKey].next(keys[smallestKey], vals[smallestKey])) {
               done[smallestKey] = true;
               readers[smallestKey].close();
               numDone++;
@@ -516,7 +528,7 @@
 
         // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
 
-        if ((!deleteSequenceInfo) && maxSeenSeqID >= 0) {
+        if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
           compactedOutputFile.writeInfo(fs, maxSeenSeqID);
           
         } else {
@@ -529,7 +541,7 @@
         DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
         try {
           out.writeInt(toCompactFiles.size());
-          for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext();) {
+          for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
             HStoreFile hsf = it.next();
             hsf.write(out);
           }
@@ -583,11 +595,11 @@
 
     // 1. Acquiring the write-lock
 
-    locking.obtainWriteLock();
+    this.locker.writeLock().lock();
     Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
     try {
       Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
-      if (!fs.exists(doneFile)) {
+      if(! fs.exists(doneFile)) {
         
         // The last execution didn't finish the compaction, so there's nothing 
         // we can do.  We'll just have to redo it. Abandon it and return.
@@ -622,18 +634,18 @@
       // 3. Unload all the replaced MapFiles.
       
       Iterator<HStoreFile> it2 = mapFiles.values().iterator();
-      for(Iterator<MapFile.Reader> it = maps.values().iterator(); it.hasNext();) {
+      for(Iterator<MapFile.Reader> it = maps.values().iterator(); it.hasNext(); ) {
         MapFile.Reader curReader = it.next();
         HStoreFile curMapFile = it2.next();
-        if (toCompactFiles.contains(curMapFile)) {
+        if(toCompactFiles.contains(curMapFile)) {
           curReader.close();
           it.remove();
         }
       }
       
-      for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext();) {
+      for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
         HStoreFile curMapFile = it.next();
-        if (toCompactFiles.contains(curMapFile)) {
+        if(toCompactFiles.contains(curMapFile)) {
           it.remove();
         }
       }
@@ -645,7 +657,7 @@
 
       // 4. Delete all the old files, no longer needed
       
-      for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext();) {
+      for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
         HStoreFile hsf = it.next();
         fs.delete(hsf.getMapFilePath());
         fs.delete(hsf.getInfoFilePath());
@@ -683,13 +695,13 @@
       
       mapFiles.put(orderVal, finalCompactedFile);
       maps.put(orderVal, new MapFile.Reader(fs, 
-                                            finalCompactedFile.getMapFilePath().toString(), conf));
+          finalCompactedFile.getMapFilePath().toString(), conf));
       
     } finally {
       
       // 7. Releasing the write-lock
       
-      locking.releaseWriteLock();
+      this.locker.writeLock().unlock();
     }
   }
 
@@ -705,7 +717,7 @@
    * The returned object should map column names to byte arrays (byte[]).
    */
   public void getFull(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
-    locking.obtainReadLock();
+    this.locker.readLock().lock();
     try {
       MapFile.Reader[] maparray 
         = maps.values().toArray(new MapFile.Reader[maps.size()]);
@@ -720,12 +732,12 @@
           
           do {
             Text readcol = readkey.getColumn();
-            if (results.get(readcol) == null
+            if(results.get(readcol) == null
                 && key.matchesWithoutColumn(readkey)) {
               results.put(new Text(readcol), readval.get());
               readval = new BytesWritable();
               
-            } else if (key.getRow().compareTo(readkey.getRow()) > 0) {
+            } else if(key.getRow().compareTo(readkey.getRow()) > 0) {
               break;
             }
             
@@ -734,7 +746,7 @@
       }
       
     } finally {
-      locking.releaseReadLock();
+      this.locker.readLock().unlock();
     }
   }
 
@@ -745,12 +757,12 @@
    * If 'numVersions' is negative, the method returns all available versions.
    */
   public byte[][] get(HStoreKey key, int numVersions) throws IOException {
-    if (numVersions == 0) {
+    if(numVersions == 0) {
       throw new IllegalArgumentException("Must request at least one value.");
     }
     
     Vector<byte[]> results = new Vector<byte[]>();
-    locking.obtainReadLock();
+    this.locker.readLock().lock();
     try {
       MapFile.Reader[] maparray 
         = maps.values().toArray(new MapFile.Reader[maps.size()]);
@@ -763,12 +775,12 @@
           map.reset();
           HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
           
-          if (readkey.matchesRowCol(key)) {
+          if(readkey.matchesRowCol(key)) {
             results.add(readval.get());
             readval = new BytesWritable();
 
             while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
-              if (numVersions > 0 && (results.size() >= numVersions)) {
+              if(numVersions > 0 && (results.size() >= numVersions)) {
                 break;
                 
               } else {
@@ -778,12 +790,12 @@
             }
           }
         }
-        if (results.size() >= numVersions) {
+        if(results.size() >= numVersions) {
           break;
         }
       }
 
-      if (results.size() == 0) {
+      if(results.size() == 0) {
         return null;
         
       } else {
@@ -791,7 +803,7 @@
       }
       
     } finally {
-      locking.releaseReadLock();
+      this.locker.readLock().unlock();
     }
   }
 
@@ -804,18 +816,23 @@
    */
   public long getLargestFileSize(Text midKey) throws IOException {
     long maxSize = 0L;
+    if (this.mapFiles.size() <= 0) {
+      return maxSize;
+    }
+    
+    
     long mapIndex = 0L;
 
     // Iterate through all the MapFiles
     
     for(Iterator<Map.Entry<Long, HStoreFile>> it = mapFiles.entrySet().iterator();
-        it.hasNext();) {
+        it.hasNext(); ) {
       
       Map.Entry<Long, HStoreFile> e = it.next();
       HStoreFile curHSF = e.getValue();
       long size = fs.getLength(new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME));
       
-      if (size > maxSize) {              // This is the largest one so far
+      if(size > maxSize) {              // This is the largest one so far
         maxSize = size;
         mapIndex = e.getKey();
       }
@@ -850,7 +867,7 @@
    * These should be closed after the user is done with them.
    */
   public HScannerInterface getScanner(long timestamp, Text targetCols[],
-                                      Text firstRow) throws IOException {
+      Text firstRow) throws IOException {
     
     return new HStoreScanner(timestamp, targetCols, firstRow);
   }
@@ -867,11 +884,11 @@
     public HStoreScanner(long timestamp, Text targetCols[], Text firstRow) throws IOException {
       super(timestamp, targetCols);
 
-      locking.obtainReadLock();
+      locker.readLock().lock();
       try {
         this.readers = new MapFile.Reader[mapFiles.size()];
         int i = 0;
-        for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext();) {
+        for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
           HStoreFile curHSF = it.next();
           readers[i++] = new MapFile.Reader(fs, curHSF.getMapFilePath().toString(), conf);
         }
@@ -885,14 +902,14 @@
           keys[i] = new HStoreKey();
           vals[i] = new BytesWritable();
 
-          if (firstRow.getLength() != 0) {
-            if (findFirstRow(i, firstRow)) {
+          if(firstRow.getLength() != 0) {
+            if(findFirstRow(i, firstRow)) {
               continue;
             }
           }
           
           while(getNext(i)) {
-            if (columnMatch(i)) {
+            if(columnMatch(i)) {
               break;
             }
           }
@@ -915,7 +932,7 @@
       HStoreKey firstKey
         = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), vals[i]);
       
-      if (firstKey == null) {
+      if(firstKey == null) {
         
         // Didn't find it. Close the scanner and return TRUE
         
@@ -935,7 +952,7 @@
      * @return - true if there is more data available
      */
     boolean getNext(int i) throws IOException {
-      if (!readers[i].next(keys[i], vals[i])) {
+      if(! readers[i].next(keys[i], vals[i])) {
         closeSubScanner(i);
         return false;
       }
@@ -945,7 +962,7 @@
     /** Close down the indicated reader. */
     void closeSubScanner(int i) throws IOException {
       try {
-        if (readers[i] != null) {
+        if(readers[i] != null) {
           readers[i].close();
         }
         
@@ -958,16 +975,16 @@
 
     /** Shut it down! */
     public void close() throws IOException {
-      if (!scannerClosed) {
+      if(! scannerClosed) {
         try {
           for(int i = 0; i < readers.length; i++) {
-            if (readers[i] != null) {
+            if(readers[i] != null) {
               readers[i].close();
             }
           }
           
         } finally {
-          locking.releaseReadLock();
+          locker.readLock().unlock();
           scannerClosed = true;
         }
       }



Mime
View raw message