hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r643521 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Client.java src/java/org/apache/hadoop/ipc/Server.java
Date Tue, 01 Apr 2008 19:03:01 GMT
Author: hairong
Date: Tue Apr  1 12:02:58 2008
New Revision: 643521

URL: http://svn.apache.org/viewvc?rev=643521&view=rev
Log:
reverting the patch to hadoop-2910

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=643521&r1=643520&r2=643521&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Apr  1 12:02:58 2008
@@ -148,9 +148,6 @@
     HADOOP-2239. Add HsftpFileSystem to permit transferring files over ssl.
     (cdouglas)
 
-    HADOOP-2910. Throttle IPC Client/Server during bursts of 
-    requests or server slowdown. (Hairong Kuang via dhruba)
-
     HADOOP-2848. [HOD]hod -o list and deallocate works even after deleting
     the cluster directory. (Hemanth Yamijala via ddas)
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=643521&r1=643520&r2=643521&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Tue Apr  1 12:02:58 2008
@@ -171,7 +171,7 @@
         try {
           this.socket = socketFactory.createSocket();
           this.socket.setTcpNoDelay(tcpNoDelay);
-          this.socket.connect(remoteId.getAddress());
+          this.socket.connect(remoteId.getAddress(), FSConstants.READ_TIMEOUT);
           break;
         } catch (IOException ie) { //SocketTimeoutException is also caught 
           if (failures == maxRetries) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=643521&r1=643520&r2=643521&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java Tue Apr  1 12:02:58 2008
@@ -45,8 +45,6 @@
 import java.util.List;
 import java.util.Iterator;
 import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -148,7 +146,7 @@
   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
 
   volatile private boolean running = true;         // true while server runs
-  private BlockingQueue<Call> callQueue; // queued calls
+  private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls
 
   private List<Connection> connectionList = 
     Collections.synchronizedList(new LinkedList<Connection>());
@@ -323,11 +321,6 @@
           closeCurrentConnection(key, e);
           cleanupConnections(true);
           try { Thread.sleep(60000); } catch (Exception ie) {}
-        } catch (InterruptedException e) {
-          if (running) {                          // unexpected -- log it
-            LOG.info(getName() + " caught: " +
-                     StringUtils.stringifyException(e));
-          }
         } catch (Exception e) {
           closeCurrentConnection(key, e);
         }
@@ -386,7 +379,7 @@
                   "; # queued calls: " + callQueue.size());
     }
 
-    void doRead(SelectionKey key) throws InterruptedException {
+    void doRead(SelectionKey key) {
       int count = 0;
       Connection c = (Connection)key.attachment();
       if (c == null) {
@@ -396,8 +389,6 @@
       
       try {
         count = c.readAndProcess();
-      } catch (InterruptedException ieo) {
-        throw ieo;
       } catch (Exception e) {
         LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes
read: " + count, e);
         count = -1; //so that the (count < 0) block is executed
@@ -829,7 +820,15 @@
       param.readFields(dis);        
         
       Call call = new Call(id, param, this);
-      callQueue.put(call);              // queue the call; maybe blocked here
+      synchronized (callQueue) {
+        if (callQueue.size() >= maxQueueSize) {
+          Call oldCall = callQueue.removeFirst();
+          LOG.warn("Call queue overflow discarding oldest call " + oldCall);
+        }
+        callQueue.addLast(call);              // queue the call
+        callQueue.notify();                   // wake up a waiting handler
+      }
+        
     }
 
     private synchronized void close() throws IOException {
@@ -859,7 +858,14 @@
       ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
       while (running) {
         try {
-          Call call = callQueue.take(); // pop the queue; maybe blocked here
+          Call call;
+          synchronized (callQueue) {
+            while (running && callQueue.size()==0) { // wait for a call
+              callQueue.wait(timeout);
+            }
+            if (!running) break;
+            call = callQueue.removeFirst(); // pop the queue
+          }
 
           // throw the message away if it is too old
           if (System.currentTimeMillis() - call.receivedTime > 
@@ -944,7 +950,6 @@
     this.socketSendBufferSize = 0;
     maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
     maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
-    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
     this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);



Mime
View raw message