hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rang...@apache.org
Subject svn commit: r708773 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/ipc/ src/core/org/apache/hadoop/net/ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/
Date Wed, 29 Oct 2008 04:47:52 GMT
Author: rangadi
Date: Tue Oct 28 21:47:51 2008
New Revision: 708773

URL: http://svn.apache.org/viewvc?rev=708773&view=rev
Log:
HADOOP-4346. Implement blocking connect so that Hadoop is not affected
by selector problem with JDK default implementation. (Raghu Angadi)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
    hadoop/core/trunk/src/core/org/apache/hadoop/net/SocketIOWithTimeout.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Oct 28 21:47:51 2008
@@ -80,6 +80,9 @@
     HADOOP-4482. Make the JMX monitoring use predictable names for the 
     datanodes to enable Nagios monitoring. (Brian Bockelman via omalley)
 
+    HADOOP-4346. Implement blocking connect so that Hadoop is not affected
+    selector problem with JDK default implementation. (Raghu Angadi)
+
 Release 0.19.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java Tue Oct 28 21:47:51 2008
@@ -296,7 +296,7 @@
             this.socket = socketFactory.createSocket();
             this.socket.setTcpNoDelay(tcpNoDelay);
             // connection time out is 20s
-            this.socket.connect(remoteId.getAddress(), 20000);
+            NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
             this.socket.setSoTimeout(pingInterval);
             break;
           } catch (SocketTimeoutException toe) {

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java Tue Oct 28 21:47:51 2008
@@ -23,8 +23,10 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
 import java.util.Map.Entry;
 import java.util.*;
 
@@ -369,6 +371,40 @@
             socket.getOutputStream() : new SocketOutputStream(socket, timeout);         
  
   }
   
+  /**
+   * This is a drop-in replacement for 
+   * {@link Socket#connect(SocketAddress, int)}.
+   * In the case of normal sockets that don't have associated channels, this 
+   * just invokes <code>socket.connect(endpoint, timeout)</code>. If 
+   * <code>socket.getChannel()</code> returns a non-null channel,
+   * connect is implemented using Hadoop's selectors. This is done mainly
+   * to avoid Sun's connect implementation from creating thread-local 
+   * selectors, since Hadoop does not have control on when these are closed
+   * and could end up taking all the available file descriptors.
+   * 
+   * @see java.net.Socket#connect(java.net.SocketAddress, int)
+   * 
+   * @param socket
+   * @param endpoint 
+   * @param timeout - timeout in milliseconds
+   */
+  public static void connect(Socket socket, 
+                             SocketAddress endpoint, 
+                             int timeout) throws IOException {
+    if (socket == null || endpoint == null || timeout < 0) {
+      throw new IllegalArgumentException("Illegal argument for connect()");
+    }
+    
+    SocketChannel ch = socket.getChannel();
+    
+    if (ch == null) {
+      // let the default implementation handle it.
+      socket.connect(endpoint, timeout);
+    } else {
+      SocketIOWithTimeout.connect(ch, endpoint, timeout);
+    }
+  }
+  
   /** 
    * Given a string representation of a host, return its ip address
    * in textual presentation.

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/SocketIOWithTimeout.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/SocketIOWithTimeout.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/SocketIOWithTimeout.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/SocketIOWithTimeout.java Tue Oct 28 21:47:51
2008
@@ -20,11 +20,13 @@
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -159,7 +161,8 @@
       } 
 
       if (count == 0) {
-        throw new SocketTimeoutException(timeoutExceptionString(ops));
+        throw new SocketTimeoutException(timeoutExceptionString(channel,
+                                                                timeout, ops));
       }
       // otherwise the socket should be ready for io.
     }
@@ -168,6 +171,64 @@
   }
   
   /**
+   * The contract is similar to {@link SocketChannel#connect(SocketAddress)} 
+   * with a timeout.
+   * 
+   * @see SocketChannel#connect(SocketAddress)
+   * 
+   * @param channel - this should be a {@link SelectableChannel}
+   * @param endpoint
+   * @throws IOException
+   */
+  static void connect(SocketChannel channel, 
+                      SocketAddress endpoint, int timeout) throws IOException {
+    
+    boolean blockingOn = channel.isBlocking();
+    if (blockingOn) {
+      channel.configureBlocking(false);
+    }
+    
+    try { 
+      if (channel.connect(endpoint)) {
+        return;
+      }
+
+      long timeoutLeft = timeout;
+      long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
+      
+      while (true) {
+        // we might have to call finishConnect() more than once
+        // for some channels (with user level protocols)
+        
+        int ret = selector.select((SelectableChannel)channel, 
+                                  SelectionKey.OP_CONNECT, timeoutLeft);
+        
+        if (ret > 0 && channel.finishConnect()) {
+          return;
+        }
+        
+        if (ret == 0 ||
+            (timeout > 0 &&  
+              (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
+          throw new SocketTimeoutException(
+                    timeoutExceptionString(channel, timeout, 
+                                           SelectionKey.OP_CONNECT));
+        }
+      }
+    } catch (IOException e) {
+      // javadoc for SocketChannel.connect() says channel should be closed.
+      try {
+        channel.close();
+      } catch (IOException ignored) {}
+      throw e;
+    } finally {
+      if (blockingOn && channel.isOpen()) {
+        channel.configureBlocking(true);
+      }
+    }
+  }
+
+  /**
    * This is similar to {@link #doIO(ByteBuffer, int)} except that it
    * does not perform any I/O. It just waits for the channel to be ready
    * for I/O as specified in ops.
@@ -182,17 +243,28 @@
   void waitForIO(int ops) throws IOException {
     
     if (selector.select(channel, ops, timeout) == 0) {
-      throw new SocketTimeoutException(timeoutExceptionString(ops)); 
+      throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
+                                                              ops)); 
     }
   }
-  
-  private String timeoutExceptionString(int ops) {
     
-    String waitingFor = "" + ops;
-    if (ops == SelectionKey.OP_READ) {
-      waitingFor = "read";
-    } else if (ops == SelectionKey.OP_WRITE) {
-      waitingFor = "write";
+  private static String timeoutExceptionString(SelectableChannel channel,
+                                               long timeout, int ops) {
+    
+    String waitingFor;
+    switch(ops) {
+    
+    case SelectionKey.OP_READ :
+      waitingFor = "read"; break;
+      
+    case SelectionKey.OP_WRITE :
+      waitingFor = "write"; break;      
+      
+    case SelectionKey.OP_CONNECT :
+      waitingFor = "connect"; break;
+      
+    default :
+      waitingFor = "" + ops;  
     }
     
     return timeout + " millis timeout while " +

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Tue Oct 28 21:47:51 2008
@@ -619,7 +619,9 @@
       for(int j = 0; !done && j < datanodes.length; j++) {
         //connect to a datanode
         final Socket sock = socketFactory.createSocket();
-        sock.connect(NetUtils.createSocketAddr(datanodes[j].getName()), timeout);
+        NetUtils.connect(sock, 
+                         NetUtils.createSocketAddr(datanodes[j].getName()),
+                         timeout);
         sock.setSoTimeout(timeout);
 
         DataOutputStream out = new DataOutputStream(
@@ -1531,7 +1533,7 @@
 
         try {
           s = socketFactory.createSocket();
-          s.connect(targetAddr, socketTimeout);
+          NetUtils.connect(s, targetAddr, socketTimeout);
           s.setSoTimeout(socketTimeout);
           Block blk = targetBlock.getBlock();
           
@@ -1733,7 +1735,7 @@
             
         try {
           dn = socketFactory.createSocket();
-          dn.connect(targetAddr, socketTimeout);
+          NetUtils.connect(dn, targetAddr, socketTimeout);
           dn.setSoTimeout(socketTimeout);
               
           int len = (int) (end - start + 1);
@@ -2737,7 +2739,7 @@
         InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
         s = socketFactory.createSocket();
         int timeoutValue = 3000 * nodes.length + socketTimeout;
-        s.connect(target, timeoutValue);
+        NetUtils.connect(s, target, timeoutValue);
         s.setSoTimeout(timeoutValue);
         s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
         LOG.debug("Send buf size " + s.getSendBufferSize());

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Oct
28 21:47:51 2008
@@ -1033,7 +1033,7 @@
         InetSocketAddress curTarget = 
           NetUtils.createSocketAddr(targets[0].getName());
         sock = newSocket();
-        sock.connect(curTarget, socketTimeout);
+        NetUtils.connect(sock, curTarget, socketTimeout);
         sock.setSoTimeout(targets.length * socketTimeout);
 
         long writeTimeout = socketWriteTimeout + 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue
Oct 28 21:47:51 2008
@@ -278,7 +278,7 @@
           int timeoutValue = numTargets * datanode.socketTimeout;
           int writeTimeout = datanode.socketWriteTimeout + 
                              (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
-          mirrorSock.connect(mirrorTarget, timeoutValue);
+          NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
           mirrorSock.setSoTimeout(timeoutValue);
           mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
           mirrorOut = new DataOutputStream(
@@ -557,7 +557,7 @@
       InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
           proxySource.getName());
       proxySock = datanode.newSocket();
-      proxySock.connect(proxyAddr, datanode.socketTimeout);
+      NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout);
       proxySock.setSoTimeout(datanode.socketTimeout);
 
       OutputStream baseStream = NetUtils.getOutputStream(proxySock, 



Mime
View raw message