hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r539243 [1/2] - in /lucene/hadoop/trunk/src/contrib/hbase/src: java/org/apache/hadoop/hbase/ test/org/apache/hadoop/hbase/
Date Fri, 18 May 2007 03:22:56 GMT
Author: jimk
Date: Thu May 17 20:22:54 2007
New Revision: 539243

URL: http://svn.apache.org/viewvc?view=rev&rev=539243
Log:
Hadoop-1384

HBase omnibus patch. Contributions by Vuk Ercegovac and Michael Stack.

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/NotServingRegionException.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionUnavailableListener.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTable.java
Removed:
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java Thu May 17 20:22:54 2007
@@ -183,10 +183,10 @@
   abstract boolean getNext(int i) throws IOException;
   
   /** Mechanism used by concrete implementation to shut down a particular scanner */
-  abstract void closeSubScanner(int i) throws IOException;
+  abstract void closeSubScanner(int i);
   
   /** Mechanism used to shut down the whole scan */
-  public abstract void close() throws IOException;
+  public abstract void close();
 
   /* (non-Javadoc)
    * @see org.apache.hadoop.hbase.HInternalScannerInterface#isWildcardScanner()

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java Thu May 17 20:22:54 2007
@@ -288,17 +288,35 @@
         throw new IOException("Timed out trying to locate root region");
       }
       
-      // Verify that this server still serves the root region
-      
       HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation);
 
-      if(rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName) != null) {
+      try {
+        rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName);
         this.tableServers = new TreeMap<Text, TableInfo>();
         this.tableServers.put(EMPTY_START_ROW,
             new TableInfo(HGlobals.rootRegionInfo, rootRegionLocation));
         
         this.tablesToServers.put(ROOT_TABLE_NAME, this.tableServers);
         break;
+        
+      } catch(NotServingRegionException e) {
+        if(tries == numRetries - 1) {
+          // Don't bother sleeping. We've run out of retries.
+          break;
+        }
+        
+        // Sleep and retry finding root region.
+
+        try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Root region location changed. Sleeping.");
+          }
+          Thread.sleep(this.clientTimeout);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Wake. Retry finding root region.");
+          }
+        } catch(InterruptedException iex) {
+        }
       }
       rootRegionLocation = null;
     }
@@ -453,7 +471,7 @@
    * Right now, it only exists as part of the META table's region info.
    */
   public synchronized HTableDescriptor[] listTables()
-  throws IOException {
+      throws IOException {
     TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
     
     TreeMap<Text, TableInfo> metaTables =
@@ -523,24 +541,84 @@
     return this.tableServers.get(serverKey);
   }
   
+  private synchronized void findRegion(TableInfo info) throws IOException {
+    
+    // Wipe out everything we know about this table
+    
+    this.tablesToServers.remove(info.regionInfo.tableDesc.getName());
+    this.tableServers.clear();
+    
+    // Reload information for the whole table
+    
+    findTableInMeta(info.regionInfo.tableDesc.getName());
+    
+    if(this.tableServers.get(info.regionInfo.startKey) == null ) {
+      throw new IOException("region " + info.regionInfo.regionName + " does not exist");
+    }
+  }
+  
   /** Get a single value for the specified row and column */
   public byte[] get(Text row, Text column) throws IOException {
-    TableInfo info = getTableInfo(row);
-    return getHRegionConnection(info.serverAddress).get(
-        info.regionInfo.regionName, row, column).get();
+    TableInfo info = null;
+    BytesWritable value = null;
+
+    for(int tries = 0; tries < numRetries && info == null; tries++) {
+      info = getTableInfo(row);
+      
+      try {
+        value = getHRegionConnection(info.serverAddress).get(
+            info.regionInfo.regionName, row, column);
+        
+      } catch(NotServingRegionException e) {
+        if(tries == numRetries - 1) {
+          // No more tries
+          throw e;
+        }
+        findRegion(info);
+        info = null;
+      }
+    }
+
+    if(value != null) {
+      byte[] bytes = new byte[value.getSize()];
+      System.arraycopy(value.get(), 0, bytes, 0, bytes.length);
+      return bytes;
+    }
+    return null;
   }
  
   /** Get the specified number of versions of the specified row and column */
   public byte[][] get(Text row, Text column, int numVersions) throws IOException {
-    TableInfo info = getTableInfo(row);
-    BytesWritable[] values = getHRegionConnection(info.serverAddress).get(
-        info.regionInfo.regionName, row, column, numVersions);
-    
-    ArrayList<byte[]> bytes = new ArrayList<byte[]>();
-    for(int i = 0 ; i < values.length; i++) {
-      bytes.add(values[i].get());
+    TableInfo info = null;
+    BytesWritable[] values = null;
+
+    for(int tries = 0; tries < numRetries && info == null; tries++) {
+      info = getTableInfo(row);
+      
+      try {
+        values = getHRegionConnection(info.serverAddress).get(
+            info.regionInfo.regionName, row, column, numVersions);
+        
+      } catch(NotServingRegionException e) {
+        if(tries == numRetries - 1) {
+          // No more tries
+          throw e;
+        }
+        findRegion(info);
+        info = null;
+      }
+    }
+
+    if(values != null) {
+      ArrayList<byte[]> bytes = new ArrayList<byte[]>();
+      for(int i = 0 ; i < values.length; i++) {
+        byte[] value = new byte[values[i].getSize()];
+        System.arraycopy(values[i].get(), 0, value, 0, value.length);
+        bytes.add(value);
+      }
+      return bytes.toArray(new byte[values.length][]);
     }
-    return bytes.toArray(new byte[values.length][]);
+    return null;
   }
   
   /** 
@@ -548,22 +626,61 @@
    * the specified timestamp.
    */
   public byte[][] get(Text row, Text column, long timestamp, int numVersions) throws IOException {
-    TableInfo info = getTableInfo(row);
-    BytesWritable[] values = getHRegionConnection(info.serverAddress).get(
-        info.regionInfo.regionName, row, column, timestamp, numVersions);
-    
-    ArrayList<byte[]> bytes = new ArrayList<byte[]>();
-    for(int i = 0 ; i < values.length; i++) {
-      bytes.add(values[i].get());
+    TableInfo info = null;
+    BytesWritable[] values = null;
+
+    for(int tries = 0; tries < numRetries && info == null; tries++) {
+      info = getTableInfo(row);
+      
+      try {
+        values = getHRegionConnection(info.serverAddress).get(
+            info.regionInfo.regionName, row, column, timestamp, numVersions);
+    
+      } catch(NotServingRegionException e) {
+        if(tries == numRetries - 1) {
+          // No more tries
+          throw e;
+        }
+        findRegion(info);
+        info = null;
+      }
     }
-    return bytes.toArray(new byte[values.length][]);
-  }
 
+    if(values != null) {
+      ArrayList<byte[]> bytes = new ArrayList<byte[]>();
+      for(int i = 0 ; i < values.length; i++) {
+        byte[] value = new byte[values[i].getSize()];
+        System.arraycopy(values[i].get(), 0, value, 0, value.length);
+        bytes.add(value);
+      }
+      return bytes.toArray(new byte[values.length][]);
+    }
+    return null;
+  }
+    
   /** Get all the data for the specified row */
   public LabelledData[] getRow(Text row) throws IOException {
-    TableInfo info = getTableInfo(row);
-    return getHRegionConnection(info.serverAddress).getRow(
-        info.regionInfo.regionName, row);
+    TableInfo info = null;
+    LabelledData[] value = null;
+    
+    for(int tries = 0; tries < numRetries && info == null; tries++) {
+      info = getTableInfo(row);
+      
+      try {
+        value = getHRegionConnection(info.serverAddress).getRow(
+            info.regionInfo.regionName, row);
+        
+      } catch(NotServingRegionException e) {
+        if(tries == numRetries - 1) {
+          // No more tries
+          throw e;
+        }
+        findRegion(info);
+        info = null;
+      }
+    }
+    
+    return value;
   }
 
   /** 
@@ -579,19 +696,34 @@
 
   /** Start an atomic row insertion or update */
   public long startUpdate(Text row) throws IOException {
-    TableInfo info = getTableInfo(row);
-    long lockid;
-    try {
-      this.currentServer = getHRegionConnection(info.serverAddress);
-      this.currentRegion = info.regionInfo.regionName;
-      this.clientid = rand.nextLong();
-      lockid = currentServer.startUpdate(this.currentRegion, this.clientid, row);
+    TableInfo info = null;
+    long lockid = -1L;
+    
+    for(int tries = 0; tries < numRetries && info == null; tries++) {
+      info = getTableInfo(row);
+      
+      try {
+        this.currentServer = getHRegionConnection(info.serverAddress);
+        this.currentRegion = info.regionInfo.regionName;
+        this.clientid = rand.nextLong();
+        lockid = currentServer.startUpdate(this.currentRegion, this.clientid, row);
+
+      } catch(NotServingRegionException e) {
+        if(tries == numRetries - 1) {
+          // No more tries
+          throw e;
+        }
+        findRegion(info);
+        info = null;
+
+      } catch(IOException e) {
+        this.currentServer = null;
+        this.currentRegion = null;
+        throw e;
+      }
       
-    } catch(IOException e) {
-      this.currentServer = null;
-      this.currentRegion = null;
-      throw e;
     }
+    
     return lockid;
   }
   
@@ -666,24 +798,27 @@
     private HRegionInterface server;
     private long scannerId;
     
-    public ClientScanner(Text[] columns, Text startRow) throws IOException {
-      this.columns = columns;
-      this.startRow = startRow;
-      this.closed = false;
-      
+    private void loadRegions() {
       Text firstServer = null;
       if(this.startRow == null || this.startRow.getLength() == 0) {
         firstServer = tableServers.firstKey();
-        
+
       } else if(tableServers.containsKey(startRow)) {
         firstServer = startRow;
-        
+
       } else {
         firstServer = tableServers.headMap(startRow).lastKey();
       }
       Collection<TableInfo> info = tableServers.tailMap(firstServer).values();
-      
       this.regions = info.toArray(new TableInfo[info.size()]);
+    }
+    
+    public ClientScanner(Text[] columns, Text startRow) throws IOException {
+      this.columns = columns;
+      this.startRow = startRow;
+      this.closed = false;
+      
+      loadRegions();
       this.currentRegion = -1;
       this.server = null;
       this.scannerId = -1L;
@@ -706,10 +841,26 @@
       }
       try {
         this.server = getHRegionConnection(this.regions[currentRegion].serverAddress);
-        this.scannerId = this.server.openScanner(
-            this.regions[currentRegion].regionInfo.regionName, this.columns,
-            this.startRow);
         
+        for(int tries = 0; tries < numRetries; tries++) {
+          TableInfo info = this.regions[currentRegion];
+          
+          try {
+            this.scannerId = this.server.openScanner(info.regionInfo.regionName,
+                this.columns, currentRegion == 0 ? this.startRow : EMPTY_START_ROW);
+            
+            break;
+        
+          } catch(NotServingRegionException e) {
+            if(tries == numRetries - 1) {
+              // No more tries
+              throw e;
+            }
+            findRegion(info);
+            loadRegions();
+          }
+        }
+
       } catch(IOException e) {
         close();
         throw e;
@@ -743,6 +894,7 @@
     public void close() throws IOException {
       if(this.scannerId != -1L) {
         this.server.close(this.scannerId);
+        this.scannerId = -1L;
       }
       this.server = null;
       this.closed = true;

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java Thu May 17 20:22:54 2007
@@ -34,7 +34,7 @@
 public interface HInternalScannerInterface {
   
   public boolean next(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException;
-  public void close() throws IOException;
+  public void close();
   /** Returns true if the scanner is matching a column family or regex */
   public boolean isWildcardScanner();
   

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java Thu May 17 20:22:54 2007
@@ -15,6 +15,8 @@
  */
 package org.apache.hadoop.hbase;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 /*******************************************************************************
  * HLocking is a set of lock primitives that does not rely on a
  * particular thread holding the monitor for an object. This is
@@ -33,12 +35,12 @@
   // If lockers > 0, locked for read
   // If lockers == -1 locked for write
   
-  private int lockers;
-
+  private AtomicInteger lockers;
+  
   /** Constructor */
   public HLocking() {
     this.mutex = new Integer(0);
-    this.lockers = 0;
+    this.lockers = new AtomicInteger(0);
   }
 
   /**
@@ -46,13 +48,13 @@
    */
   public void obtainReadLock() {
     synchronized(mutex) {
-      while(lockers < 0) {
+      while(lockers.get() < 0) {
         try {
           mutex.wait();
         } catch(InterruptedException ie) {
         }
       }
-      lockers++;
+      lockers.incrementAndGet();
       mutex.notifyAll();
     }
   }
@@ -62,8 +64,7 @@
    */
   public void releaseReadLock() {
     synchronized(mutex) {
-      lockers--;
-      if(lockers < 0) {
+      if(lockers.decrementAndGet() < 0) {
         throw new IllegalStateException("lockers: " + lockers);
       }
       mutex.notifyAll();
@@ -75,13 +76,12 @@
    */
   public void obtainWriteLock() {
     synchronized(mutex) {
-      while(lockers != 0) {
+      while(!lockers.compareAndSet(0, -1)) {
         try {
           mutex.wait();
         } catch (InterruptedException ie) {
         }
       }
-      lockers = -1;
       mutex.notifyAll();
     }
   }
@@ -91,10 +91,9 @@
    */
   public void releaseWriteLock() {
     synchronized(mutex) {
-      if(lockers != -1) {
+      if(!lockers.compareAndSet(-1, 0)) {
         throw new IllegalStateException("lockers: " + lockers);
       }
-      lockers = 0;
       mutex.notifyAll();
     }
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Thu May 17 20:22:54 2007
@@ -270,8 +270,14 @@
 
   /** Shut down the log. */
   public synchronized void close() throws IOException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("closing log writer");
+    }
     this.writer.close();
     this.closed = true;
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("log writer closed");
+    }
   }
 
   /**

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Thu May 17 20:22:54 2007
@@ -164,7 +164,7 @@
           HRegionInfo info = getRegionInfo(COL_REGIONINFO, results, inbuf);
           String serverName = getServerName(COL_SERVER, results);
           long startCode = getStartCode(COL_STARTCODE, results);
-          
+
           if(LOG.isDebugEnabled()) {
             LOG.debug("row: " + info.toString() + ", server: " + serverName
                 + ", startCode: " + startCode);
@@ -177,11 +177,11 @@
         }
       } finally {
         try {
-        if (scannerId != -1L) {
-          server.close(scannerId);
-        }
+          if (scannerId != -1L) {
+            server.close(scannerId);
+          }
         } catch (IOException e) {
-            e.printStackTrace();
+          e.printStackTrace();
         }
         scannerId = -1L;
       }
@@ -581,13 +581,13 @@
     // Main processing loop
     for(PendingOperation op = null; !closed; ) {
       synchronized(msgQueue) {
-        while(msgQueue.size() == 0 && serversToServerInfo.size() != 0) {
+        while(msgQueue.size() == 0 && !closed) {
           try {
             msgQueue.wait(threadWakeFrequency);
           } catch(InterruptedException iex) {
           }
         }
-        if(msgQueue.size() == 0 || closed) {
+        if(closed) {
           continue;
         }
         op = msgQueue.remove(msgQueue.size()-1);
@@ -616,14 +616,6 @@
     }
     server.stop();                              // Stop server
     serverLeases.close();                       // Turn off the lease monitor
-    try {
-      fs.close();
-      client.close();                           // Shut down the client
-    } catch(IOException iex) {
-      // Print if ever there is an interrupt (Just for kicks. Remove if it
-      // ever happens).
-      iex.printStackTrace();
-    }
     
     // Join up with all threads
     
@@ -652,6 +644,7 @@
       // ever happens).
       iex.printStackTrace();
     }
+    
     if(LOG.isDebugEnabled()) {
       LOG.debug("HMaster main thread exiting");
     }
@@ -774,19 +767,9 @@
   HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) throws IOException {
     Vector<HMsg> returnMsgs = new Vector<HMsg>();
     
-    // Process the kill list
-    
     TreeMap<Text, HRegionInfo> regionsToKill =
       killList.remove(info.getServerAddress().toString());
     
-    if(regionsToKill != null) {
-      for(Iterator<HRegionInfo> i = regionsToKill.values().iterator();
-          i.hasNext(); ) {
-        
-        returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_AND_DELETE, i.next()));
-      }
-    }
-
     // Get reports on what the RegionServer did.
     
     for(int i = 0; i < incomingMsgs.length; i++) {
@@ -872,18 +855,11 @@
         } else {
           boolean reassignRegion = true;
 
-          synchronized(regionsToKill) {
-            if(regionsToKill.containsKey(region.regionName)) {
-              regionsToKill.remove(region.regionName);
-
-              if(regionsToKill.size() > 0) {
-                killList.put(info.toString(), regionsToKill);
-
-              } else {
-                killList.remove(info.toString());
-              }
-              reassignRegion = false;
-            }
+          if(regionsToKill.containsKey(region.regionName)) {
+            regionsToKill.remove(region.regionName);
+            unassignedRegions.remove(region.regionName);
+            assignAttempts.remove(region.regionName);
+            reassignRegion = false;
           }
 
           synchronized(msgQueue) {
@@ -902,14 +878,15 @@
         if(LOG.isDebugEnabled()) {
           LOG.debug("new region " + region.regionName);
         }
+        
+        // A region has split and the old server is serving the two new regions.
 
         if(region.regionName.find(META_TABLE_NAME.toString()) == 0) {
           // A meta region has split.
 
           allMetaRegionsScanned = false;
         }
-        unassignedRegions.put(region.regionName, region);
-        assignAttempts.put(region.regionName, 0L);
+        
         break;
 
       default:
@@ -918,6 +895,16 @@
       }
     }
 
+    // Process the kill list
+    
+    if(regionsToKill != null) {
+      for(Iterator<HRegionInfo> i = regionsToKill.values().iterator();
+          i.hasNext(); ) {
+        
+        returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_AND_DELETE, i.next()));
+      }
+    }
+
     // Figure out what the RegionServer ought to do, and write back.
 
     if(unassignedRegions.size() > 0) {
@@ -1460,108 +1447,167 @@
     } else {
       firstMetaRegion = knownMetaRegions.headMap(tableName).lastKey();
     }
-    for(Iterator<MetaRegion> it =
-      knownMetaRegions.tailMap(firstMetaRegion).values().iterator();
-        it.hasNext(); ) {
 
-      // Find all the regions that make up this table
-      
-      MetaRegion m = it.next();
-      HRegionInterface server = client.getHRegionConnection(m.server);
-      Vector<Text> rowsToDelete = new Vector<Text>();
+    synchronized(metaScannerLock) {     // Prevent meta scanner from running
+      for(Iterator<MetaRegion> it =
+          knownMetaRegions.tailMap(firstMetaRegion).values().iterator();
+          it.hasNext(); ) {
 
-      long scannerId = -1L;
-      try {
-        scannerId = server.openScanner(m.regionName, METACOLUMNS, tableName);
-        
-        
-        DataInputBuffer inbuf = new DataInputBuffer();
-        byte[] bytes;
-        while(true) {
-          LabelledData[] values = null;
-          HStoreKey key = new HStoreKey();
-          values = server.next(scannerId, key);
-          if(values == null || values.length == 0) {
-            break;
-          }
-          TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-          for(int i = 0; i < values.length; i++) {
-            bytes = new byte[values[i].getData().getSize()];
-            System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
-            results.put(values[i].getLabel(), bytes);
-          }
-          bytes = results.get(COL_REGIONINFO);
-          if(bytes == null || bytes.length == 0) {
-            break;
-          }
-          inbuf.reset(bytes, bytes.length);
-          HRegionInfo info = new HRegionInfo();
-          info.readFields(inbuf);
+        // Find all the regions that make up this table
 
-          if(info.tableDesc.getName().compareTo(tableName) > 0) {
-            break;                      // Beyond any more entries for this table
-          }
-          
-          rowsToDelete.add(info.regionName);
+        MetaRegion m = it.next();
+        HRegionInterface server = client.getHRegionConnection(m.server);
+
+        // Rows in the meta table we will need to delete
+
+        Vector<Text> rowsToDelete = new Vector<Text>();
+
+        // Regions that are being served. We will get the HRegionServers
+        // to delete them for us, but we don't tell them that until after
+        // we are done scanning to prevent lock contention
+
+        TreeMap<String, TreeMap<Text, HRegionInfo>> localKillList =
+          new TreeMap<String, TreeMap<Text, HRegionInfo>>();
+
+        // Regions that are not being served. We will have to delete
+        // them ourselves
+
+        TreeSet<Text> unservedRegions = new TreeSet<Text>();
+
+        long scannerId = -1L;
+        try {
+          scannerId = server.openScanner(m.regionName, METACOLUMNS, tableName);
 
-          // Is it being served?
-          
-          bytes = results.get(COL_SERVER);
-          if(bytes != null && bytes.length != 0) {
-            String serverName = new String(bytes, UTF8_ENCODING);
-            
-            bytes = results.get(COL_STARTCODE);
-            if(bytes != null && bytes.length != 0) {
-              long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
 
-              HServerInfo s = serversToServerInfo.get(serverName);
-              if(s != null && s.getStartCode() == startCode) {
+          DataInputBuffer inbuf = new DataInputBuffer();
+          byte[] bytes;
+          while(true) {
+            LabelledData[] values = null;
+            HStoreKey key = new HStoreKey();
+            values = server.next(scannerId, key);
+            if(values == null || values.length == 0) {
+              break;
+            }
+            TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+            for(int i = 0; i < values.length; i++) {
+              bytes = new byte[values[i].getData().getSize()];
+              System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
+              results.put(values[i].getLabel(), bytes);
+            }
+            bytes = results.get(COL_REGIONINFO);
+            if(bytes == null || bytes.length == 0) {
+              break;
+            }
+            inbuf.reset(bytes, bytes.length);
+            HRegionInfo info = new HRegionInfo();
+            info.readFields(inbuf);
 
-                // It is being served.
-                // Tell the server to stop it and not report back.
+            if(info.tableDesc.getName().compareTo(tableName) > 0) {
+              break;                      // Beyond any more entries for this table
+            }
 
-                TreeMap<Text, HRegionInfo> regionsToKill =
-                  killList.get(serverName);
-                
-                if(regionsToKill == null) {
-                  regionsToKill = new TreeMap<Text, HRegionInfo>();
+            rowsToDelete.add(info.regionName);
+
+            // Is it being served?
+
+            bytes = results.get(COL_SERVER);
+            if(bytes != null && bytes.length != 0) {
+              String serverName = new String(bytes, UTF8_ENCODING);
+
+              bytes = results.get(COL_STARTCODE);
+              if(bytes != null && bytes.length != 0) {
+                long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
+
+                HServerInfo s = serversToServerInfo.get(serverName);
+                if(s != null && s.getStartCode() == startCode) {
+
+                  // It is being served.
+                  // Tell the server to stop it and not report back.
+
+                  TreeMap<Text, HRegionInfo> regionsToKill =
+                    localKillList.get(serverName);
+
+                  if(regionsToKill == null) {
+                    regionsToKill = new TreeMap<Text, HRegionInfo>();
+                  }
+                  regionsToKill.put(info.regionName, info);
+                  localKillList.put(serverName, regionsToKill);
+                  continue;
                 }
-                regionsToKill.put(info.regionName, info);
-                killList.put(serverName, regionsToKill);
               }
             }
+            
+            // Region is not currently being served.
+            // Prevent it from getting assigned and add it to the list of
+            // regions we need to delete here.
+            
+            unassignedRegions.remove(info.regionName);
+            assignAttempts.remove(info.regionName);
+            unservedRegions.add(info.regionName);
+          }
+          
+        } catch(IOException e) {
+          e.printStackTrace();
+
+        } finally {
+          if(scannerId != -1L) {
+            try {
+              server.close(scannerId);
+
+            } catch(IOException e) {
+              e.printStackTrace();
+
+            }
           }
+          scannerId = -1L;
         }
-      } catch(IOException e) {
-        e.printStackTrace();
+
+        // Wipe the existence of the regions out of the meta table
         
-      } finally {
-        if(scannerId != -1L) {
+        for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
+          Text rowName = row.next();
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("deleting columns in row: " + rowName);
+          }
+          long lockid = -1L;
+          long clientId = rand.nextLong();
           try {
-            server.close(scannerId);
-            
-          } catch(IOException e) {
-            e.printStackTrace();
-            
+            lockid = server.startUpdate(m.regionName, clientId, rowName);
+            server.delete(m.regionName, clientId, lockid, COL_REGIONINFO);
+            server.delete(m.regionName, clientId, lockid, COL_SERVER);
+            server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
+            server.commit(m.regionName, clientId, lockid);
+            lockid = -1L;
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("deleted columns in row: " + rowName);
+            }
+
+          } catch(Exception e) {
+            if(lockid != -1L) {
+              server.abort(m.regionName, clientId, lockid);
+            }
+            LOG.error("columns deletion failed in row: " + rowName);
+            LOG.error(e);
           }
         }
-        scannerId = -1L;
-      }
-      for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
-        Text rowName = row.next();
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("deleting columns in row: " + rowName);
+        
+        // Notify region servers that some regions need to be closed and deleted
+        
+        if(localKillList.size() != 0) {
+          killList.putAll(localKillList);
         }
-        try {
-          long clientId = rand.nextLong();
-          long lockid = server.startUpdate(m.regionName, clientId, rowName);
-          server.delete(m.regionName, clientId, lockid, COL_REGIONINFO);
-          server.delete(m.regionName, clientId, lockid, COL_SERVER);
-          server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
-          server.commit(m.regionName, clientId, lockid);
-          
-        } catch(Exception e) {
-          e.printStackTrace();
+
+        // Delete any regions that are not being served
+        
+        for(Iterator<Text> i = unservedRegions.iterator(); i.hasNext(); ) {
+          Text regionName = i.next();
+          try {
+            HRegion.deleteRegion(fs, dir, regionName);
+            
+          } catch(IOException e) {
+            LOG.error("failed to delete region " + regionName);
+            LOG.error(e);
+          }
         }
       }
     }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Thu May 17 20:22:54 2007
@@ -296,7 +296,7 @@
         for(int i = history.size() - 1; i > 0; i--) {
           backingMaps[i] = history.elementAt(i);
         }
-
+      
         this.keyIterators = new Iterator[backingMaps.length];
         this.keys = new HStoreKey[backingMaps.length];
         this.vals = new BytesWritable[backingMaps.length];
@@ -322,8 +322,10 @@
           }
         }
         
-      } catch(Exception ex) {
+      } catch(IOException ex) {
+        LOG.error(ex);
         close();
+        throw ex;
       }
     }
 
@@ -365,7 +367,7 @@
     }
 
     /** Shut down map iterators, and release the lock */
-    public void close() throws IOException {
+    public void close() {
       if(! scannerClosed) {
         try {
           for(int i = 0; i < keys.length; i++) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Thu May 17 20:22:54 2007
@@ -65,6 +65,19 @@
   private static final Log LOG = LogFactory.getLog(HRegion.class);
 
   /**
+   * Deletes all the files for a HRegion
+   * 
+   * @param fs                  - the file system object
+   * @param baseDirectory       - base directory for HBase
+   * @param regionName          - name of the region to delete
+   * @throws IOException
+   */
+  public static void deleteRegion(FileSystem fs, Path baseDirectory,
+      Text regionName) throws IOException {
+    fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
+  }
+  
+  /**
    * Merge two HRegions.  They must be available on the current
    * HRegionServer. Returns a brand-new active HRegion, also
    * running on the current HRegionServer.
@@ -245,7 +258,7 @@
   TreeMap<Long, TreeMap<Text, BytesWritable>> targetColumns 
       = new TreeMap<Long, TreeMap<Text, BytesWritable>>();
   
-  HMemcache memcache = new HMemcache();
+  HMemcache memcache;
 
   Path dir;
   HLog log;
@@ -255,9 +268,9 @@
   Path regiondir;
 
   class WriteState {
-    public boolean writesOngoing;
-    public boolean writesEnabled;
-    public boolean closed;
+    public volatile boolean writesOngoing;
+    public volatile boolean writesEnabled;
+    public volatile boolean closed;
     public WriteState() {
       this.writesOngoing = true;
       this.writesEnabled = true;
@@ -265,12 +278,13 @@
     }
   }
   
-  WriteState writestate = new WriteState();
+  volatile WriteState writestate = new WriteState();
   int recentCommits = 0;
-  int commitsSinceFlush = 0;
+  volatile int commitsSinceFlush = 0;
 
   int maxUnflushedEntries = 0;
   int compactionThreshold = 0;
+  HLocking lock = null;
 
   //////////////////////////////////////////////////////////////////////////////
   // Constructor
@@ -302,10 +316,14 @@
     this.fs = fs;
     this.conf = conf;
     this.regionInfo = regionInfo;
+    this.memcache = new HMemcache();
+
 
     this.writestate.writesOngoing = true;
     this.writestate.writesEnabled = true;
     this.writestate.closed = false;
+    
+    this.lock = new HLocking();
 
     // Declare the regionName.  This is a unique string for the region, used to 
     // build a unique filename.
@@ -354,12 +372,22 @@
   public HRegionInfo getRegionInfo() {
     return this.regionInfo;
   }
+
+  /** returns true if region is closed */
+  public boolean isClosed() {
+    boolean closed = false;
+    synchronized(writestate) {
+      closed = writestate.closed;
+    }
+    return closed;
+  }
   
   /** Closes and deletes this HRegion. Called when doing a table deletion, for example */
   public void closeAndDelete() throws IOException {
     LOG.info("deleting region: " + regionInfo.regionName);
     close();
-    fs.delete(regiondir);
+    deleteRegion(fs, dir, regionInfo.regionName);
+    LOG.info("region deleted: " + regionInfo.regionName);
   }
   
   /**
@@ -373,42 +401,47 @@
    * time-sensitive thread.
    */
   public Vector<HStoreFile> close() throws IOException {
-    boolean shouldClose = false;
-    synchronized(writestate) {
-      if(writestate.closed) {
-        LOG.info("region " + this.regionInfo.regionName + " closed");
-        return new Vector<HStoreFile>();
-      }
-      while(writestate.writesOngoing) {
-        try {
-          writestate.wait();
-        } catch (InterruptedException iex) {
+    lock.obtainWriteLock();
+    try {
+      boolean shouldClose = false;
+      synchronized(writestate) {
+        if(writestate.closed) {
+          LOG.info("region " + this.regionInfo.regionName + " closed");
+          return new Vector<HStoreFile>();
+        }
+        while(writestate.writesOngoing) {
+          try {
+            writestate.wait();
+          } catch (InterruptedException iex) {
+          }
         }
+        writestate.writesOngoing = true;
+        shouldClose = true;
       }
-      writestate.writesOngoing = true;
-      shouldClose = true;
-    }
 
-    if(! shouldClose) {
-      return null;
-      
-    } else {
-      LOG.info("closing region " + this.regionInfo.regionName);
-      Vector<HStoreFile> allHStoreFiles = internalFlushcache();
-      for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
-        HStore store = it.next();
-        store.close();
-      }
-      try {
-        return allHStoreFiles;
-        
-      } finally {
-        synchronized(writestate) {
-          writestate.closed = true;
-          writestate.writesOngoing = false;
+      if(! shouldClose) {
+        return null;
+
+      } else {
+        LOG.info("closing region " + this.regionInfo.regionName);
+        Vector<HStoreFile> allHStoreFiles = internalFlushcache();
+        for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
+          HStore store = it.next();
+          store.close();
+        }
+        try {
+          return allHStoreFiles;
+
+        } finally {
+          synchronized(writestate) {
+            writestate.closed = true;
+            writestate.writesOngoing = false;
+          }
+          LOG.info("region " + this.regionInfo.regionName + " closed");
         }
-        LOG.info("region " + this.regionInfo.regionName + " closed");
       }
+    } finally {
+      lock.releaseWriteLock();
     }
   }
 
@@ -418,7 +451,9 @@
    *
    * Returns two brand-new (and open) HRegions
    */
-  public HRegion[] closeAndSplit(Text midKey) throws IOException {
+  public HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
+      throws IOException {
+    
     if(((regionInfo.startKey.getLength() != 0)
         && (regionInfo.startKey.compareTo(midKey) > 0))
         || ((regionInfo.endKey.getLength() != 0)
@@ -428,9 +463,6 @@
 
     LOG.info("splitting region " + this.regionInfo.regionName);
 
-    // Flush this HRegion out to storage, and turn off flushes
-    // or compactions until close() is called.
-    
     Path splits = new Path(regiondir, SPLITDIR);
     if(! fs.exists(splits)) {
       fs.mkdirs(splits);
@@ -453,6 +485,10 @@
     }
     
     TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
+
+    // Flush this HRegion out to storage, and turn off flushes
+    // or compactions until close() is called.
+    
     Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
     for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
       HStoreFile hsf = it.next();
@@ -472,8 +508,12 @@
       alreadySplit.add(hsf);
     }
 
-    // We just copied most of the data.  Now close the HRegion
-    // and copy the small remainder
+    // We just copied most of the data.
+    // Notify the caller that we are about to close the region
+    
+    listener.regionIsUnavailable(this.getRegionName());
+    
+    // Now close the HRegion and copy the small remainder
     
     hstoreFilesToSplit = close();
     for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
@@ -577,19 +617,26 @@
    * @return            - true if the region should be split
    */
   public boolean needsSplit(Text midKey) {
-    Text key = new Text();
-    long maxSize = 0;
+    lock.obtainReadLock();
 
-    for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
-      long size = i.next().getLargestFileSize(key);
-      
-      if(size > maxSize) {                      // Largest so far
-        maxSize = size;
-        midKey.set(key);
+    try {
+      Text key = new Text();
+      long maxSize = 0;
+
+      for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
+        long size = i.next().getLargestFileSize(key);
+
+        if(size > maxSize) {                      // Largest so far
+          maxSize = size;
+          midKey.set(key);
+        }
       }
-    }
 
-    return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2)));
+      return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2)));
+      
+    } finally {
+      lock.releaseReadLock();
+    }
   }
 
   /**
@@ -597,11 +644,16 @@
    */
   public boolean needsCompaction() {
     boolean needsCompaction = false;
-    for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
-      if(i.next().getNMaps() > compactionThreshold) {
-        needsCompaction = true;
-        break;
+    lock.obtainReadLock();
+    try {
+      for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
+        if(i.next().getNMaps() > compactionThreshold) {
+          needsCompaction = true;
+          break;
+        }
       }
+    } finally {
+      lock.releaseReadLock();
     }
     return needsCompaction;
   }
@@ -621,15 +673,20 @@
    */
   public boolean compactStores() throws IOException {
     boolean shouldCompact = false;
-    synchronized(writestate) {
-      if((! writestate.writesOngoing)
-          && writestate.writesEnabled
-          && (! writestate.closed)
-          && recentCommits > MIN_COMMITS_FOR_COMPACTION) {
-        
-        writestate.writesOngoing = true;
-        shouldCompact = true;
+    lock.obtainReadLock();
+    try {
+      synchronized(writestate) {
+        if((! writestate.writesOngoing)
+            && writestate.writesEnabled
+            && (! writestate.closed)
+            && recentCommits > MIN_COMMITS_FOR_COMPACTION) {
+
+          writestate.writesOngoing = true;
+          shouldCompact = true;
+        }
       }
+    } finally {
+      lock.releaseReadLock();
     }
 
     if(! shouldCompact) {
@@ -637,6 +694,7 @@
       return false;
       
     } else {
+      lock.obtainWriteLock();
       try {
         LOG.info("starting compaction on region " + this.regionInfo.regionName);
         for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
@@ -652,6 +710,7 @@
           recentCommits = 0;
           writestate.notifyAll();
         }
+        lock.releaseWriteLock();
       }
     }
   }
@@ -872,22 +931,28 @@
 
   private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
 
-    // Check the memcache
+    lock.obtainReadLock();
+    try {
+      // Check the memcache
 
-    BytesWritable[] result = memcache.get(key, numVersions);
-    if(result != null) {
-      return result;
-    }
+      BytesWritable[] result = memcache.get(key, numVersions);
+      if(result != null) {
+        return result;
+      }
 
-    // If unavailable in memcache, check the appropriate HStore
+      // If unavailable in memcache, check the appropriate HStore
 
-    Text colFamily = HStoreKey.extractFamily(key.getColumn());
-    HStore targetStore = stores.get(colFamily);
-    if(targetStore == null) {
-      return null;
+      Text colFamily = HStoreKey.extractFamily(key.getColumn());
+      HStore targetStore = stores.get(colFamily);
+      if(targetStore == null) {
+        return null;
+      }
+
+      return targetStore.get(key, numVersions);
+      
+    } finally {
+      lock.releaseReadLock();
     }
-    
-    return targetStore.get(key, numVersions);
   }
 
   /**
@@ -903,13 +968,19 @@
   public TreeMap<Text, BytesWritable> getFull(Text row) throws IOException {
     HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
 
-    TreeMap<Text, BytesWritable> memResult = memcache.getFull(key);
-    for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
-      Text colFamily = it.next();
-      HStore targetStore = stores.get(colFamily);
-      targetStore.getFull(key, memResult);
+    lock.obtainReadLock();
+    try {
+      TreeMap<Text, BytesWritable> memResult = memcache.getFull(key);
+      for(Iterator<Text> it = stores.keySet().iterator(); it.hasNext(); ) {
+        Text colFamily = it.next();
+        HStore targetStore = stores.get(colFamily);
+        targetStore.getFull(key, memResult);
+      }
+      return memResult;
+      
+    } finally {
+      lock.releaseReadLock();
     }
-    return memResult;
   }
 
   /**
@@ -917,18 +988,24 @@
    * columns.  This Iterator must be closed by the caller.
    */
   public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException {
-    TreeSet<Text> families = new TreeSet<Text>();
-    for(int i = 0; i < cols.length; i++) {
-      families.add(HStoreKey.extractFamily(cols[i]));
-    }
+    lock.obtainReadLock();
+    try {
+      TreeSet<Text> families = new TreeSet<Text>();
+      for(int i = 0; i < cols.length; i++) {
+        families.add(HStoreKey.extractFamily(cols[i]));
+      }
 
-    HStore[] storelist = new HStore[families.size()];
-    int i = 0;
-    for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
-      Text family = it.next();
-      storelist[i++] = stores.get(family);
+      HStore[] storelist = new HStore[families.size()];
+      int i = 0;
+      for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
+        Text family = it.next();
+        storelist[i++] = stores.get(family);
+      }
+      return new HScanner(cols, firstRow, memcache, storelist);
+      
+    } finally {
+      lock.releaseReadLock();
     }
-    return new HScanner(cols, firstRow, memcache, storelist);
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -949,8 +1026,14 @@
 
     // We obtain a per-row lock, so other clients will
     // block while one client performs an update.
-    
-    return obtainLock(row);
+
+    lock.obtainReadLock();
+    try {
+      return obtainLock(row);
+      
+    } finally {
+      lock.releaseReadLock();
+    }
   }
 
   /**
@@ -1176,9 +1259,16 @@
 
     /** Create an HScanner with a handle on many HStores. */
     @SuppressWarnings("unchecked")
-    public HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores) throws IOException {
+    public HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores)
+        throws IOException {
+      
       long scanTime = System.currentTimeMillis();
+      
       this.scanners = new HInternalScannerInterface[stores.length + 1];
+      for(int i = 0; i < this.scanners.length; i++) {
+        this.scanners[i] = null;
+      }
+      
       this.resultSets = new TreeMap[scanners.length];
       this.keys = new HStoreKey[scanners.length];
       this.wildcardMatch = false;
@@ -1189,28 +1279,38 @@
       
       // NOTE: the memcache scanner should be the first scanner
 
-      HInternalScannerInterface scanner =
-        memcache.getScanner(scanTime, cols, firstRow);
-      
-      if(scanner.isWildcardScanner()) {
-        this.wildcardMatch = true;
-      }
-      if(scanner.isMultipleMatchScanner()) {
-        this.multipleMatchers = true;
-      }
-      scanners[0] = scanner;
-      
-      for(int i = 0; i < stores.length; i++) {
-        scanner = stores[i].getScanner(scanTime, cols, firstRow);
+      try {
+        HInternalScannerInterface scanner =
+          memcache.getScanner(scanTime, cols, firstRow);
+
+
         if(scanner.isWildcardScanner()) {
           this.wildcardMatch = true;
         }
         if(scanner.isMultipleMatchScanner()) {
           this.multipleMatchers = true;
         }
-        scanners[i + 1] = scanner;
-      }
+        scanners[0] = scanner;
 
+        for(int i = 0; i < stores.length; i++) {
+          scanner = stores[i].getScanner(scanTime, cols, firstRow);
+          if(scanner.isWildcardScanner()) {
+            this.wildcardMatch = true;
+          }
+          if(scanner.isMultipleMatchScanner()) {
+            this.multipleMatchers = true;
+          }
+          scanners[i + 1] = scanner;
+        }
+
+      } catch(IOException e) {
+        for(int i = 0; i < this.scanners.length; i++) {
+          if(scanners[i] != null) {
+            closeScanner(i);
+          }
+        }
+        throw e;
+      }
       for(int i = 0; i < scanners.length; i++) {
         keys[i] = new HStoreKey();
         resultSets[i] = new TreeMap<Text, BytesWritable>();
@@ -1319,7 +1419,7 @@
     }
 
     /** Shut down a single scanner */
-    void closeScanner(int i) throws IOException {
+    void closeScanner(int i) {
       try {
         scanners[i].close();
         
@@ -1331,7 +1431,7 @@
     }
 
     /** All done with the scanner. */
-    public void close() throws IOException {
+    public void close() {
       for(int i = 0; i < scanners.length; i++) {
         if(scanners[i] != null) {
           closeScanner(i);

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java Thu May 17 20:22:54 2007
@@ -30,7 +30,7 @@
 
   // Get metainfo about an HRegion
 
-  public HRegionInfo getRegionInfo(Text regionName);
+  public HRegionInfo getRegionInfo(Text regionName) throws NotServingRegionException;
 
   // GET methods for an HRegion.
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Thu May 17 20:22:54 2007
@@ -46,7 +46,7 @@
   
   private volatile boolean stopRequested;
   private Path regionDir;
-  private HServerAddress address;
+  private HServerInfo info;
   private Configuration conf;
   private Random rand;
   private TreeMap<Text, HRegion> regions;               // region name -> HRegion
@@ -64,24 +64,26 @@
   private Thread splitOrCompactCheckerThread;
   private Integer splitOrCompactLock = new Integer(0);
   
-  private class SplitOrCompactChecker implements Runnable {
+  private class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
     private HClient client = new HClient(conf);
   
-    private class SplitRegion {
-      public HRegion region;
-      public Text midKey;
-      
-      SplitRegion(HRegion region, Text midKey) {
-        this.region = region;
-        this.midKey = midKey;
-      }
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text)
+     */
+    public void regionIsUnavailable(Text regionName) {
+      lock.obtainWriteLock();
+      regions.remove(regionName);
+      lock.releaseWriteLock();
     }
-    
+
+    /* (non-Javadoc)
+     * @see java.lang.Runnable#run()
+     */
     public void run() {
       while(! stopRequested) {
         long startTime = System.currentTimeMillis();
 
-        synchronized(splitOrCompactLock) {
+        synchronized(splitOrCompactLock) { // Don't interrupt us while we're working
 
           // Grab a list of regions to check
 
@@ -93,85 +95,81 @@
             lock.releaseReadLock();
           }
 
-          // Check to see if they need splitting or compacting
-
-          Vector<SplitRegion> toSplit = new Vector<SplitRegion>();
-          Vector<HRegion> toCompact = new Vector<HRegion>();
-          for(Iterator<HRegion> it = regionsToCheck.iterator(); it.hasNext(); ) {
-            HRegion cur = it.next();
-            Text midKey = new Text();
-
-            if(cur.needsCompaction()) {
-              toCompact.add(cur);
-
-            } else if(cur.needsSplit(midKey)) {
-              toSplit.add(new SplitRegion(cur, midKey));
-            }
-          }
-
           try {
-            for(Iterator<HRegion>it = toCompact.iterator(); it.hasNext(); ) {
-              it.next().compactStores();
-            }
-
-            for(Iterator<SplitRegion> it = toSplit.iterator(); it.hasNext(); ) {
-              SplitRegion r = it.next();
-
-              lock.obtainWriteLock();
-              regions.remove(r.region.getRegionName());
-              lock.releaseWriteLock();
-
-              HRegion[] newRegions = null;
-              Text oldRegion = r.region.getRegionName();
-
-              LOG.info("splitting region: " + oldRegion);
-
-              newRegions = r.region.closeAndSplit(r.midKey);
-
-              // When a region is split, the META table needs to updated if we're
-              // splitting a 'normal' region, and the ROOT table needs to be
-              // updated if we are splitting a META region.
-
-              Text tableToUpdate =
-                (oldRegion.find(META_TABLE_NAME.toString()) == 0) ?
-                    ROOT_TABLE_NAME : META_TABLE_NAME;
-
-              if(LOG.isDebugEnabled()) {
-                LOG.debug("region split complete. updating meta");
-              }
-
-              client.openTable(tableToUpdate);
-              long lockid = client.startUpdate(oldRegion);
-              client.delete(lockid, COL_REGIONINFO);
-              client.delete(lockid, COL_SERVER);
-              client.delete(lockid, COL_STARTCODE);
-              client.commit(lockid);
-
-              for(int i = 0; i < newRegions.length; i++) {
-                ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-                DataOutputStream out = new DataOutputStream(bytes);
-                newRegions[i].getRegionInfo().write(out);
-
-                lockid = client.startUpdate(newRegions[i].getRegionName());
-                client.put(lockid, COL_REGIONINFO, bytes.toByteArray());
-                client.commit(lockid);
+            for(Iterator<HRegion>it = regionsToCheck.iterator(); it.hasNext(); ) {
+              HRegion cur = it.next();
+              
+              if(cur.isClosed()) {
+                continue;                               // Skip if closed
               }
-
-              // Now tell the master about the new regions
-
-              if(LOG.isDebugEnabled()) {
-                LOG.debug("reporting region split to master");
+              
+              if(cur.needsCompaction()) {
+                
+                // The best time to split a region is right after it has been compacted
+                
+                if(cur.compactStores()) {
+                  Text midKey = new Text();
+                  if(cur.needsSplit(midKey)) {
+                    Text oldRegion = cur.getRegionName();
+
+                    LOG.info("splitting region: " + oldRegion);
+
+                    HRegion[] newRegions = cur.closeAndSplit(midKey, this);
+
+                    // When a region is split, the META table needs to updated if we're
+                    // splitting a 'normal' region, and the ROOT table needs to be
+                    // updated if we are splitting a META region.
+
+                    if(LOG.isDebugEnabled()) {
+                      LOG.debug("region split complete. updating meta");
+                    }
+
+                    Text tableToUpdate =
+                      (oldRegion.find(META_TABLE_NAME.toString()) == 0) ?
+                          ROOT_TABLE_NAME : META_TABLE_NAME;
+
+                    client.openTable(tableToUpdate);
+                    long lockid = client.startUpdate(oldRegion);
+                    client.delete(lockid, COL_REGIONINFO);
+                    client.delete(lockid, COL_SERVER);
+                    client.delete(lockid, COL_STARTCODE);
+                    client.commit(lockid);
+
+                    for(int i = 0; i < newRegions.length; i++) {
+                      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+                      DataOutputStream out = new DataOutputStream(bytes);
+                      newRegions[i].getRegionInfo().write(out);
+
+                      lockid = client.startUpdate(newRegions[i].getRegionName());
+                      client.put(lockid, COL_REGIONINFO, bytes.toByteArray());
+                      client.put(lockid, COL_SERVER, 
+                          info.getServerAddress().toString().getBytes(UTF8_ENCODING));
+                      client.put(lockid, COL_STARTCODE, 
+                          String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
+                      client.commit(lockid);
+                    }
+                    
+                    // Now tell the master about the new regions
+
+                    if(LOG.isDebugEnabled()) {
+                      LOG.debug("reporting region split to master");
+                    }
+
+                    reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
+
+                    LOG.info("region split successful. old region=" + oldRegion
+                        + ", new regions: " + newRegions[0].getRegionName() + ", "
+                        + newRegions[1].getRegionName());
+
+                    // Finally, start serving the new regions
+                    
+                    lock.obtainWriteLock();
+                    regions.put(newRegions[0].getRegionName(), newRegions[0]);
+                    regions.put(newRegions[1].getRegionName(), newRegions[1]);
+                    lock.releaseWriteLock();
+                  }
+                }
               }
-
-              reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
-
-              LOG.info("region split successful. old region=" + oldRegion
-                  + ", new regions: " + newRegions[0].getRegionName() + ", "
-                  + newRegions[1].getRegionName());
-
-              newRegions[0].close();
-              newRegions[1].close();
-
             }
           } catch(IOException e) {
             //TODO: What happens if this fails? Are we toast?
@@ -228,6 +226,10 @@
 
           for(Iterator<HRegion> it = toFlush.iterator(); it.hasNext(); ) {
             HRegion cur = it.next();
+            
+            if(cur.isClosed()) {                // Skip if closed
+              continue;
+            }
 
             try {
               cur.optionallyFlush();
@@ -330,8 +332,7 @@
   
   /** Start a HRegionServer at an indicated location */
   public HRegionServer(Path regionDir, HServerAddress address,
-      Configuration conf) 
-  throws IOException {
+      Configuration conf) throws IOException {
     
     // Basic setup
     this.stopRequested = false;
@@ -369,19 +370,25 @@
 
     try {
       // Server to handle client requests
+      
       this.server = RPC.getServer(this, address.getBindAddress().toString(), 
         address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
         false, conf);
 
-      this.address = new HServerAddress(server.getListenerAddress());
+      this.info = new HServerInfo(new HServerAddress(server.getListenerAddress()),
+          this.rand.nextLong());
 
       // Local file paths
+      
       String serverName =
-        this.address.getBindAddress() + "_" + this.address.getPort();
+        this.info.getServerAddress().getBindAddress() + "_"
+        + this.info.getServerAddress().getPort();
+      
       Path newlogdir = new Path(regionDir, "log" + "_" + serverName);
       this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + serverName);
 
       // Logging
+      
       this.fs = FileSystem.get(conf);
       HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf);
       // TODO: Now we have a consolidated log for all regions, sort and
@@ -393,13 +400,14 @@
       this.logRollerThread = new Thread(logRoller, "HRegionServer.logRoller");
 
       // Remote HMaster
-      this.hbaseMaster = (HMasterRegionInterface)RPC.
-        waitForProxy(HMasterRegionInterface.class,
-        HMasterRegionInterface.versionID,
-        new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
-        conf);
+      
+      this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
+          HMasterRegionInterface.class, HMasterRegionInterface.versionID,
+          new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
+          conf);
 
       // Threads
+      
       this.workerThread.start();
       this.cacheFlusherThread.start();
       this.splitOrCompactCheckerThread.start();
@@ -452,7 +460,7 @@
       this.server.join();
     } catch(InterruptedException iex) {
     }
-    LOG.info("HRegionServer stopped at: " + address.toString());
+    LOG.info("HRegionServer stopped at: " + info.getServerAddress().toString());
   }
   
   /**
@@ -462,7 +470,6 @@
    */
   public void run() {
     while(! stopRequested) {
-      HServerInfo info = new HServerInfo(address, rand.nextLong());
       long lastMsg = 0;
       long waitTime;
 
@@ -557,7 +564,7 @@
             }
 
           } catch(IOException e) {
-            e.printStackTrace();
+            LOG.error(e);
           }
         }
 
@@ -580,7 +587,7 @@
       }
     }
     try {
-      LOG.info("stopping server at: " + address.toString());
+      LOG.info("stopping server at: " + info.getServerAddress().toString());
 
       // Send interrupts to wake up threads if sleeping so they notice shutdown.
 
@@ -761,58 +768,68 @@
       throws IOException {
     
     this.lock.obtainWriteLock();
+    HRegion region = null;
     try {
-      HRegion region = regions.remove(info.regionName);
-      
-      if(region != null) {
-        region.close();
-        
-        if(reportWhenCompleted) {
-          reportClose(region);
-        }
-      }
-      
+      region = regions.remove(info.regionName);
     } finally {
       this.lock.releaseWriteLock();
     }
+      
+    if(region != null) {
+      region.close();
+
+      if(reportWhenCompleted) {
+        reportClose(region);
+      }
+    }
   }
 
   private void closeAndDeleteRegion(HRegionInfo info) throws IOException {
-
     this.lock.obtainWriteLock();
+    HRegion region = null;
     try {
-      HRegion region = regions.remove(info.regionName);
-  
-      if(region != null) {
-        region.closeAndDelete();
-      }
+      region = regions.remove(info.regionName);
   
     } finally {
       this.lock.releaseWriteLock();
     }
+    if(region != null) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("deleting region " + info.regionName);
+      }
+      
+      region.closeAndDelete();
+      
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("region " + info.regionName + " deleted");
+      }
+    }
   }
 
   /** Called either when the master tells us to restart or from stop() */
   private void closeAllRegions() {
+    Vector<HRegion> regionsToClose = new Vector<HRegion>();
     this.lock.obtainWriteLock();
     try {
-      for(Iterator<HRegion> it = regions.values().iterator(); it.hasNext(); ) {
-        HRegion region = it.next();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("closing region " + region.getRegionName());
-        }
-        try {
-          region.close();
-          
-        } catch(IOException e) {
-          e.printStackTrace();
-        }
-      }
+      regionsToClose.addAll(regions.values());
       regions.clear();
       
     } finally {
       this.lock.releaseWriteLock();
     }
+    for(Iterator<HRegion> it = regionsToClose.iterator(); it.hasNext(); ) {
+      HRegion region = it.next();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("closing region " + region.getRegionName());
+      }
+      try {
+        region.close();
+        LOG.debug("region closed " + region.getRegionName());
+        
+      } catch(IOException e) {
+        LOG.error("error closing region " + region.getRegionName(), e);
+      }
+    }
   }
 
   /*****************************************************************************
@@ -847,20 +864,14 @@
   //////////////////////////////////////////////////////////////////////////////
 
   /** Obtain a table descriptor for the given region */
-  public HRegionInfo getRegionInfo(Text regionName) {
+  public HRegionInfo getRegionInfo(Text regionName) throws NotServingRegionException {
     HRegion region = getRegion(regionName);
-    if(region == null) {
-      return null;
-    }
     return region.getRegionInfo();
   }
 
   /** Get the indicated row/column */
   public BytesWritable get(Text regionName, Text row, Text column) throws IOException {
     HRegion region = getRegion(regionName);
-    if(region == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("get " + row.toString() + ", " + column.toString());
@@ -877,9 +888,6 @@
       int numVersions) throws IOException {
     
     HRegion region = getRegion(regionName);
-    if(region == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
     
     BytesWritable[] results = region.get(row, column, numVersions);
     if(results != null) {
@@ -893,9 +901,6 @@
       long timestamp, int numVersions) throws IOException {
     
     HRegion region = getRegion(regionName);
-    if(region == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
     
     BytesWritable[] results = region.get(row, column, timestamp, numVersions);
     if(results != null) {
@@ -907,9 +912,6 @@
   /** Get all the columns (along with their names) for a given row. */
   public LabelledData[] getRow(Text regionName, Text row) throws IOException {
     HRegion region = getRegion(regionName);
-    if(region == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
     
     TreeMap<Text, BytesWritable> map = region.getFull(row);
     LabelledData result[] = new LabelledData[map.size()];
@@ -949,9 +951,6 @@
       throws IOException {
     
     HRegion region = getRegion(regionName);
-    if(region == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
     
     long lockid = region.startUpdate(row);
     leases.createLease(new Text(String.valueOf(clientid)), 
@@ -966,9 +965,6 @@
       BytesWritable val) throws IOException {
     
     HRegion region = getRegion(regionName);
-    if(region == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
     
     leases.renewLease(new Text(String.valueOf(clientid)), 
         new Text(String.valueOf(lockid)));
@@ -981,9 +977,6 @@
       throws IOException {
     
     HRegion region = getRegion(regionName);
-    if(region == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
     
     leases.renewLease(new Text(String.valueOf(clientid)), 
         new Text(String.valueOf(lockid)));
@@ -996,9 +989,6 @@
       throws IOException {
     
     HRegion region = getRegion(regionName);
-    if(region == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
     
     leases.cancelLease(new Text(String.valueOf(clientid)), 
         new Text(String.valueOf(lockid)));
@@ -1011,9 +1001,6 @@
       throws IOException {
     
     HRegion region = getRegion(regionName);
-    if(region == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
     
     leases.cancelLease(new Text(String.valueOf(clientid)), 
         new Text(String.valueOf(lockid)));
@@ -1028,14 +1015,20 @@
   }
 
   /** Private utility method for safely obtaining an HRegion handle. */
-  private HRegion getRegion(Text regionName) {
+  private HRegion getRegion(Text regionName) throws NotServingRegionException {
     this.lock.obtainReadLock();
+    HRegion region = null;
     try {
-      return regions.get(regionName);
+      region = regions.get(regionName);
       
     } finally {
       this.lock.releaseReadLock();
     }
+
+    if(region == null) {
+      throw new NotServingRegionException(regionName.toString());
+    }
+    return region;
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -1051,14 +1044,12 @@
     }
     
     public void leaseExpired() {
-      HInternalScannerInterface s = scanners.remove(scannerName);
+      HInternalScannerInterface s = null;
+      synchronized(scanners) {
+        s = scanners.remove(scannerName);
+      }
       if(s != null) {
-        try {
-          s.close();
-          
-        } catch(IOException e) {
-          e.printStackTrace();
-        }
+        s.close();
       }
     }
   }
@@ -1068,16 +1059,14 @@
       throws IOException {
 
     HRegion r = getRegion(regionName);
-    if(r == null) {
-      throw new IOException("Not serving region " + regionName);
-    }
-
     long scannerId = -1L;
     try {
       HInternalScannerInterface s = r.getScanner(cols, firstRow);
       scannerId = rand.nextLong();
       Text scannerName = new Text(String.valueOf(scannerId));
-      scanners.put(scannerName, s);
+      synchronized(scanners) {
+        scanners.put(scannerName, s);
+      }
       leases.createLease(scannerName, scannerName, new ScannerListener(scannerName));
     
     } catch(IOException e) {
@@ -1121,16 +1110,14 @@
   
   public void close(long scannerId) throws IOException {
     Text scannerName = new Text(String.valueOf(scannerId));
-    HInternalScannerInterface s = scanners.remove(scannerName);
+    HInternalScannerInterface s = null;
+    synchronized(scanners) {
+      s = scanners.remove(scannerName);
+    }
     if(s == null) {
       throw new IOException("unknown scanner");
     }
-    try {
-      s.close();
-        
-    } catch(IOException ex) {
-      ex.printStackTrace();
-    }
+    s.close();
     leases.cancelLease(scannerName, scannerName);
   }
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Thu May 17 20:22:54 2007
@@ -342,8 +342,14 @@
     }
   }
 
-  public synchronized Vector<HStoreFile> getAllMapFiles() {
-    return new Vector<HStoreFile>(mapFiles.values());
+  public Vector<HStoreFile> getAllMapFiles() {
+    this.lock.obtainReadLock();
+    try {
+      return new Vector<HStoreFile>(mapFiles.values());
+      
+    } finally {
+      this.lock.releaseReadLock();
+    }
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -938,7 +944,9 @@
   class HStoreScanner extends HAbstractScanner {
     private MapFile.Reader[] readers;
     
-    public HStoreScanner(long timestamp, Text[] targetCols, Text firstRow) throws IOException {
+    public HStoreScanner(long timestamp, Text[] targetCols, Text firstRow)
+        throws IOException {
+      
       super(timestamp, targetCols);
 
       lock.obtainReadLock();
@@ -976,6 +984,7 @@
         }
         
       } catch (Exception ex) {
+        LOG.error(ex);
         close();
       }
     }
@@ -1021,10 +1030,15 @@
     }
     
     /** Close down the indicated reader. */
-    void closeSubScanner(int i) throws IOException {
+    void closeSubScanner(int i) {
       try {
         if(readers[i] != null) {
-          readers[i].close();
+          try {
+            readers[i].close();
+            
+          } catch(IOException e) {
+            LOG.error(e);
+          }
         }
         
       } finally {
@@ -1035,12 +1049,17 @@
     }
 
     /** Shut it down! */
-    public void close() throws IOException {
+    public void close() {
       if(! scannerClosed) {
         try {
           for(int i = 0; i < readers.length; i++) {
             if(readers[i] != null) {
-              readers[i].close();
+              try {
+                readers[i].close();
+                
+              } catch(IOException e) {
+                LOG.error(e);
+              }
             }
           }
           

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java?view=diff&rev=539243&r1=539242&r2=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java Thu May 17 20:22:54 2007
@@ -15,6 +15,8 @@
  */
 package org.apache.hadoop.hbase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.*;
 
 import java.io.*;
@@ -36,6 +38,8 @@
  * You should close() the instance if you want to clean up the thread properly.
  ******************************************************************************/
 public class Leases {
+  private static final Log LOG = LogFactory.getLog(Leases.class);
+
   long leasePeriod;
   long leaseCheckFrequency;
   LeaseMonitor leaseMonitor;
@@ -47,7 +51,7 @@
   /** Indicate the length of the lease, in milliseconds */
   public Leases(long leasePeriod, long leaseCheckFrequency) {
     this.leasePeriod = leasePeriod;
-
+    this.leaseCheckFrequency = leaseCheckFrequency;
     this.leaseMonitor = new LeaseMonitor();
     this.leaseMonitorThread = new Thread(leaseMonitor);
     this.leaseMonitorThread.setName("Lease.monitor");
@@ -59,6 +63,9 @@
    * without any cancellation calls.
    */
   public void close() {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("closing leases");
+    }
     this.running = false;
     try {
       this.leaseMonitorThread.interrupt();
@@ -70,6 +77,9 @@
         leases.clear();
         sortedLeases.clear();
       }
+    }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("leases closed");
     }
   }
 

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/NotServingRegionException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/NotServingRegionException.java?view=auto&rev=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/NotServingRegionException.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/NotServingRegionException.java Thu May 17 20:22:54 2007
@@ -0,0 +1,30 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+public class NotServingRegionException extends IOException {
+  private static final long serialVersionUID = 1L << 17 - 1L;
+  public NotServingRegionException() {
+    super();
+  }
+
+  public NotServingRegionException(String s) {
+    super(s);
+  }
+
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionUnavailableListener.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionUnavailableListener.java?view=auto&rev=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionUnavailableListener.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionUnavailableListener.java Thu May 17 20:22:54 2007
@@ -0,0 +1,27 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Used as a callback mechanism so that an HRegion can notify the HRegionServer
+ * when a region is about to be closed during a split operation. This is done
+ * to minimize the amount of time the region is off-line.
+ */
+public interface RegionUnavailableListener {
+  public void regionIsUnavailable(Text regionName);
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java?view=auto&rev=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java Thu May 17 20:22:54 2007
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+/**
+ * Abstract base class for HBase cluster junit tests.  Spins up cluster on
+ * {@link #setUp()} and takes it down again in {@link #tearDown()}.
+ */
+public abstract class HBaseClusterTestCase extends HBaseTestCase {
+  protected MiniHBaseCluster cluster;
+  final boolean miniHdfs;
+  
+  protected HBaseClusterTestCase() {
+    this(false);
+  }
+
+  protected HBaseClusterTestCase(String name) {
+    this(name, false);
+  }
+  
+  protected HBaseClusterTestCase(final boolean miniHdfs) {
+    super();
+    this.miniHdfs = miniHdfs;
+  }
+
+  protected HBaseClusterTestCase(String name, final boolean miniHdfs) {
+    super(name);
+    this.miniHdfs = miniHdfs;
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    this.cluster = new MiniHBaseCluster(this.conf, 1, this.miniHdfs);
+  }
+
+  public void tearDown() throws Exception {
+    super.tearDown();
+    if (this.cluster != null) {
+      this.cluster.shutdown();
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?view=auto&rev=539243
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Thu May 17 20:22:54 2007
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Abstract base class for test cases. Performs all static initialization
+ */
+public abstract class HBaseTestCase extends TestCase {
+  static {
+    StaticTestEnvironment.initialize();
+  }
+  
+  protected Configuration conf;
+  
+  protected HBaseTestCase() {
+    super();
+    conf = new HBaseConfiguration();
+  }
+  
+  protected HBaseTestCase(String name) {
+    super(name);
+    conf = new HBaseConfiguration();
+  }
+
+  protected Path getUnitTestdir(String testName) {
+    return new Path(StaticTestEnvironment.TEST_DIRECTORY_KEY, testName);
+  }
+}



Mime
View raw message