hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r617670 - /hadoop/core/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
Date Fri, 01 Feb 2008 22:01:43 GMT
Author: jimk
Date: Fri Feb  1 14:01:39 2008
New Revision: 617670

URL: http://svn.apache.org/viewvc?rev=617670&view=rev
Log:
HADOOP-2555 Refactor the HTable#get and HTable#getRow methods to avoid repetition of retry-on-failure
logic (thanks to Peter Dolan and Bryan Duxbury)

Modified:
    hadoop/core/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java

Modified: hadoop/core/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java?rev=617670&r1=617669&r2=617670&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (original)
+++ hadoop/core/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java Fri Feb
 1 14:01:39 2008
@@ -29,6 +29,7 @@
 import java.util.TreeMap;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -275,38 +276,15 @@
    * @return value for specified row/column
    * @throws IOException
    */
-  public byte[] get(Text row, Text column) throws IOException {
-    checkClosed();
-    byte [] value = null;
-    for(int tries = 0; tries < numRetries; tries++) {
-      HRegionLocation r = getRegionLocation(row);
-      HRegionInterface server =
-        connection.getHRegionConnection(r.getServerAddress());
-      
-      try {
-        value = server.get(r.getRegionInfo().getRegionName(), row, column);
-        break;
-        
-      } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        }
-        if (tries == numRetries - 1) {
-          throw e;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("reloading table servers because: " + e.getMessage());
-        }
-        r = getRegionLocation(row, true);
-      }
-      try {
-        Thread.sleep(this.pause);
-      } catch (InterruptedException x) {
-        // continue
-      }
-    }
-    return value;
-  }
+   public byte[] get(Text row, final Text column) throws IOException {
+     checkClosed();
+     
+     return getRegionServerWithRetries(new ServerCallable<byte[]>(row){
+       public byte[] call() throws IOException {
+         return server.get(location.getRegionInfo().getRegionName(), row, column);
+       }
+     });
+   }
  
   /** 
    * Get the specified number of versions of the specified row and column
@@ -317,39 +295,17 @@
    * @return            - array byte values
    * @throws IOException
    */
-  public byte[][] get(Text row, Text column, int numVersions) throws IOException {
+  public byte[][] get(final Text row, final Text column, final int numVersions) 
+  throws IOException {
     checkClosed();
     byte [][] values = null;
-    for (int tries = 0; tries < numRetries; tries++) {
-      HRegionLocation r = getRegionLocation(row);
-      HRegionInterface server = 
-        connection.getHRegionConnection(r.getServerAddress());
-      
-      try {
-        values = server.get(r.getRegionInfo().getRegionName(), row, column,
-            numVersions);
-        
-        break;
-        
-      } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        }
-        if (tries == numRetries - 1) {
-          // No more tries
-          throw e;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("reloading table servers because: " + e.getMessage());
-        }
-        r = getRegionLocation(row, true);
-      }
-      try {
-        Thread.sleep(this.pause);
-      } catch (InterruptedException x) {
-        // continue
+
+    values = getRegionServerWithRetries(new ServerCallable<byte[][]>(row) {
+      public byte [][] call() throws IOException {
+        return server.get(location.getRegionInfo().getRegionName(), row, 
+          column, numVersions);
       }
-    }
+    });
 
     if (values != null) {
       ArrayList<byte[]> bytes = new ArrayList<byte[]>();
@@ -372,40 +328,18 @@
    * @return            - array of values that match the above criteria
    * @throws IOException
    */
-  public byte[][] get(Text row, Text column, long timestamp, int numVersions)
+  public byte[][] get(final Text row, final Text column, final long timestamp, 
+    final int numVersions)
   throws IOException {
     checkClosed();
     byte [][] values = null;
-    for (int tries = 0; tries < numRetries; tries++) {
-      HRegionLocation r = getRegionLocation(row);
-      HRegionInterface server =
-        connection.getHRegionConnection(r.getServerAddress());
-      
-      try {
-        values = server.get(r.getRegionInfo().getRegionName(), row, column,
-            timestamp, numVersions);
-        
-        break;
-    
-      } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        }
-        if (tries == numRetries - 1) {
-          // No more tries
-          throw e;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("reloading table servers because: " + e.getMessage());
-        }
-        r = getRegionLocation(row, true);
-      }
-      try {
-        Thread.sleep(this.pause);
-      } catch (InterruptedException x) {
-        // continue
+
+    values = getRegionServerWithRetries(new ServerCallable<byte[][]>(row) {
+      public byte [][] call() throws IOException {
+        return server.get(location.getRegionInfo().getRegionName(), row, 
+          column, timestamp, numVersions);
       }
-    }
+    });
 
     if (values != null) {
       ArrayList<byte[]> bytes = new ArrayList<byte[]>();
@@ -436,37 +370,17 @@
    * @return Map of columns to values.  Map is empty if row does not exist.
    * @throws IOException
    */
-  public SortedMap<Text, byte[]> getRow(Text row, long ts) throws IOException {
+  public SortedMap<Text, byte[]> getRow(final Text row, final long ts) 
+  throws IOException {
     checkClosed();
     HbaseMapWritable value = null;
-    for (int tries = 0; tries < numRetries; tries++) {
-      HRegionLocation r = getRegionLocation(row);
-      HRegionInterface server =
-        connection.getHRegionConnection(r.getServerAddress());
-      
-      try {
-        value = server.getRow(r.getRegionInfo().getRegionName(), row, ts);
-        break;
-        
-      } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        }
-        if (tries == numRetries - 1) {
-          // No more tries
-          throw e;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("reloading table servers because: " + e.getMessage());
-        }
-        r = getRegionLocation(row, true);
-      }
-      try {
-        Thread.sleep(this.pause);
-      } catch (InterruptedException x) {
-        // continue
+         
+    value = getRegionServerWithRetries(new ServerCallable<HbaseMapWritable>(row) {
+      public HbaseMapWritable call() throws IOException {
+        return server.getRow(location.getRegionInfo().getRegionName(), row, ts);
       }
-    }
+    });
+    
     SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
     if (value != null && value.size() != 0) {
       for (Map.Entry<Writable, Writable> e: value.entrySet()) {
@@ -722,32 +636,14 @@
   public void deleteAll(final Text row, final Text column, final long ts)
   throws IOException {
     checkClosed();
-    for(int tries = 0; tries < numRetries; tries++) {
-      HRegionLocation r = getRegionLocation(row);
-      HRegionInterface server =
-        connection.getHRegionConnection(r.getServerAddress());
-      try {
-        server.deleteAll(r.getRegionInfo().getRegionName(), row, column, ts);
-        break;
-        
-      } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        }
-        if (tries == numRetries - 1) {
-          throw e;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("reloading table servers because: " + e.getMessage());
-        }
-        r = getRegionLocation(row, true);
-      }
-      try {
-        Thread.sleep(this.pause);
-      } catch (InterruptedException x) {
-        // continue
+          
+    getRegionServerWithRetries(new ServerCallable<Boolean>(row) {
+      public Boolean call() throws IOException {
+        server.deleteAll(location.getRegionInfo().getRegionName(), row, 
+          column, ts);
+        return null;
       }
-    }
+    });
   }
   
   /**
@@ -757,34 +653,15 @@
    * @param ts Timestamp of cells to delete
    * @throws IOException
    */
-  public void deleteAll(final Text row, long ts) throws IOException {
+  public void deleteAll(final Text row, final long ts) throws IOException {
     checkClosed();
-    for(int tries = 0; tries < numRetries; tries++) {
-      HRegionLocation r = getRegionLocation(row);
-      HRegionInterface server =
-        connection.getHRegionConnection(r.getServerAddress());
-      try {
-        server.deleteAll(r.getRegionInfo().getRegionName(), row, ts);
-        break;
-
-      } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        }
-        if (tries == numRetries - 1) {
-          throw e;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("reloading table servers because: " + e.getMessage());
-        }
-        r = getRegionLocation(row, true);
-      }
-      try {
-        Thread.sleep(this.pause);
-      } catch (InterruptedException x) {
-        // continue
+    
+    getRegionServerWithRetries(new ServerCallable<Boolean>(row){
+      public Boolean call() throws IOException {
+        server.deleteAll(location.getRegionInfo().getRegionName(), row, ts);
+        return null;
       }
-    }
+    });
   }
       
   /**
@@ -806,35 +683,18 @@
    * @param timestamp Timestamp to match
    * @throws IOException
    */
-  public void deleteFamily(final Text row, final Text family, long timestamp)
+  public void deleteFamily(final Text row, final Text family, 
+    final long timestamp)
   throws IOException {
     checkClosed();
-    for(int tries = 0; tries < numRetries; tries++) {
-      HRegionLocation r = getRegionLocation(row);
-      HRegionInterface server =
-        connection.getHRegionConnection(r.getServerAddress());
-      try {
-        server.deleteFamily(r.getRegionInfo().getRegionName(), row, family, timestamp);
-        break;
-
-      } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        }
-        if (tries == numRetries - 1) {
-          throw e;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("reloading table servers because: " + e.getMessage());
-        }
-        r = getRegionLocation(row, true);
-      }
-      try {
-        Thread.sleep(this.pause);
-      } catch (InterruptedException x) {
-        // continue
+    
+    getRegionServerWithRetries(new ServerCallable<Boolean>(row){
+      public Boolean call() throws IOException {
+        server.deleteFamily(location.getRegionInfo().getRegionName(), row, 
+          family, timestamp);
+        return null;
       }
-    }
+    });
   }
 
   /**
@@ -891,7 +751,7 @@
    * @param timestamp time to associate with the change
    * @throws IOException
    */
-  public synchronized void commit(long lockid, long timestamp)
+  public synchronized void commit(long lockid, final long timestamp)
   throws IOException {
     checkClosed();
     updateInProgress(true);
@@ -900,34 +760,15 @@
     }
     
     try {
-      for (int tries = 0; tries < numRetries; tries++) {
-        HRegionLocation r = getRegionLocation(batch.get().getRow());
-        HRegionInterface server =
-          connection.getHRegionConnection(r.getServerAddress());
-        try {
-          server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp,
-            batch.get());
-          break;
-        } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException(
-                (RemoteException) e);
+      getRegionServerWithRetries(
+        new ServerCallable<Boolean>(batch.get().getRow()){
+          public Boolean call() throws IOException {
+            server.batchUpdate(location.getRegionInfo().getRegionName(), 
+              timestamp, batch.get());
+            return null;
           }
-          if (tries < numRetries - 1) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("reloading table servers because: " + e.getMessage());
-            }
-            r = getRegionLocation(batch.get().getRow(), true);
-          } else {
-            throw e;
-          }
-        }
-        try {
-          Thread.sleep(pause);
-        } catch (InterruptedException e) {
-          // continue
         }
-      }
+      );
     } finally {
       batch.set(null);
     }
@@ -1149,5 +990,57 @@
         }
       };
     }
+  }
+  
+  /**
+   * Inherits from Callable, used to define the particular actions you would
+   * like to take with retry logic.
+   */
+  protected abstract class ServerCallable<T> implements Callable<T> {
+    HRegionLocation location;
+    HRegionInterface server;
+    Text row;
+  
+    protected ServerCallable(Text row) {
+      this.row = row;
+    }
+  
+    void instantiateServer(boolean reload) throws IOException {
+      this.location = getRegionLocation(row, reload);
+      this.server = connection.getHRegionConnection(location.getServerAddress());
+    }    
+  }
+  
+  /**
+   * Pass in a ServerCallable with your particular bit of logic defined and 
+   * this method will manage the process of doing retries with timed waits 
+   * and refinds of missing regions.
+   */
+  protected <T> T getRegionServerWithRetries(ServerCallable<T> callable) 
+  throws IOException, RuntimeException {
+    for(int tries = 0; tries < numRetries; tries++) {
+      try {
+        callable.instantiateServer(tries != 0);
+        return callable.call();
+      } catch (IOException e) {
+        if (e instanceof RemoteException) {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        }
+        if (tries == numRetries - 1) {
+          throw e;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("reloading table servers because: " + e.getMessage());
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    return null;    
   }
 }



Mime
View raw message