hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1153602 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
Date Wed, 03 Aug 2011 18:26:32 GMT
Author: tedyu
Date: Wed Aug  3 18:26:30 2011
New Revision: 1153602

URL: http://svn.apache.org/viewvc?rev=1153602&view=rev
Log:
HBASE-4003  Cleanup Calls Conservatively On Timeout (Karthick)

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1153602&r1=1153601&r2=1153602&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Aug  3 18:26:30 2011
@@ -183,6 +183,7 @@ Release 0.91.0 - Unreleased
                by using /hbase as the base node.(ramkrishna.s.vasudevan)
    HBASE-4032  HBASE-451 improperly breaks public API HRegionInfo#getTableDesc
    HBASE-4148  HFileOutputFormat doesn't fill in TIMERANGE_KEY metadata (Jonathan Hsieh)
+   HBASE-4003  Cleanup Calls Conservatively On Timeout (Karthick)
 
   IMPROVEMENTS
    HBASE-3290  Max Compaction Size (Nicolas Spiegelberg via Stack)  

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1153602&r1=1153601&r2=1153602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed Aug  3 18:26:30
2011
@@ -30,6 +30,7 @@ import java.io.InputStream;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.util.Hashtable;
@@ -37,6 +38,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -167,9 +169,11 @@ public class HBaseClient {
     Writable value;                               // value, null if error
     IOException error;                            // exception, null if value
     boolean done;                                 // true when call is done
+    long startTime;
 
     protected Call(Writable param) {
       this.param = param;
+      this.startTime = System.currentTimeMillis();
       synchronized (HBaseClient.this) {
         this.id = counter++;
       }
@@ -201,6 +205,10 @@ public class HBaseClient {
       this.value = value;
       callComplete();
     }
+
+    public long getStartTime() {
+      return this.startTime;
+    }
   }
 
   /** Thread that reads responses and notifies callers.  Each connection owns a
@@ -214,7 +222,7 @@ public class HBaseClient {
     private DataOutputStream out;
 
     // currently active calls
-    private final Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
+    private final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer,
Call>();
     private final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate
if the connection is closed
     private IOException closeException; // close reason
@@ -568,20 +576,21 @@ public class HBaseClient {
           }
           calls.remove(id);
         }
-      } catch (SocketTimeoutException ste) {
-        if (remoteId.rpcTimeout > 0) {
+      } catch (IOException e) {
+        if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
           // Clean up open calls but don't treat this as a fatal condition,
           // since we expect certain responses to not make it by the specified
           // {@link ConnectionId#rpcTimeout}.
-          closeException = ste;
-          cleanupCalls();
+          closeException = e;
         } else {
           // Since the server did not respond within the default ping interval
           // time, treat this as a fatal condition and close this connection
-          markClosed(ste);
+          markClosed(e);
+        }
+      } finally {
+        if (remoteId.rpcTimeout > 0) {
+          cleanupCalls(remoteId.rpcTimeout);
         }
-      } catch (IOException e) {
-        markClosed(e);
       }
     }
 
@@ -635,15 +644,40 @@ public class HBaseClient {
 
     /* Cleanup all calls and mark them as done */
     private void cleanupCalls() {
-      Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
+      cleanupCalls(0);
+    }
+
+    private void cleanupCalls(long rpcTimeout) {
+      Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
       while (itor.hasNext()) {
         Call c = itor.next().getValue();
-        c.setException(closeException); // local exception
-        // Notify the open calls, so they are aware of what just happened
-        synchronized (c) {
-          c.notifyAll();
+        long waitTime = System.currentTimeMillis() - c.getStartTime();
+        if (waitTime >= rpcTimeout) {
+          c.setException(closeException); // local exception
+          synchronized (c) {
+            c.notifyAll();
+          }
+          itor.remove();
+        } else {
+          break;
+        }
+      }
+      try {
+        if (!calls.isEmpty()) {
+          Call firstCall = calls.get(calls.firstKey());
+          long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
+          if (maxWaitTime < rpcTimeout) {
+            rpcTimeout -= maxWaitTime;
+          }
+        }
+        if (!shouldCloseConnection.get()) {
+          closeException = null;
+          if (socket != null) {
+            socket.setSoTimeout((int) rpcTimeout);
+          }
         }
-        itor.remove();
+      } catch (SocketException e) {
+        LOG.debug("Couldn't lower timeout, which may result in longer than expected calls");
       }
     }
   }



Mime
View raw message