hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rang...@apache.org
Subject svn commit: r648875 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/DataNode.java src/java/org/apache/hadoop/dfs/FSConstants.java
Date Wed, 16 Apr 2008 22:00:24 GMT
Author: rangadi
Date: Wed Apr 16 15:00:21 2008
New Revision: 648875

URL: http://svn.apache.org/viewvc?rev=648875&view=rev
Log:
HADOOP-3124. Make DataNode socket write timeout configurable. (rangadi)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=648875&r1=648874&r2=648875&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 16 15:00:21 2008
@@ -268,6 +268,8 @@
     HADOOP-2993. Clarify the usage of JAVA_HOME in the Quick Start guide.
     (acmurthy via nigel)
 
+    HADOOP-3124. Make DataNode socket write timeout configurable. (rangadi)
+
   OPTIMIZATIONS
 
     HADOOP-2790.  Fixed inefficient method hasSpeculativeTask by removing

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=648875&r1=648874&r2=648875&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Apr 16 15:00:21 2008
@@ -27,8 +27,6 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.SocketInputStream;
-import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -121,6 +119,7 @@
   private static String dnThreadName;
   int defaultBytesPerChecksum = 512;
   private int socketTimeout;
+  private int socketWriteTimeout = 0;  
   
   private DataBlockScanner blockScanner;
   private Daemon blockScannerThread;
@@ -207,6 +206,9 @@
     this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     this.socketTimeout =  conf.getInt("dfs.socket.timeout",
                                       FSConstants.READ_TIMEOUT);
+    this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
+                                          FSConstants.WRITE_TIMEOUT);
+    
     String address = 
       NetUtils.getServerAddress(conf,
                                 "dfs.datanode.bindAddress", 
@@ -257,7 +259,8 @@
 
       
     // find free port
-    ServerSocket ss = ServerSocketChannel.open().socket();
+    ServerSocket ss = (socketWriteTimeout > 0) ? 
+          ServerSocketChannel.open().socket() : new ServerSocket();
     Server.bind(ss, socAddr, 0);
     ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
     // adjust machine name with the actual port
@@ -333,6 +336,14 @@
     myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
   }
 
+  /**
+   * Creates either NIO or regular depending on socketWriteTimeout.
+   */
+  private Socket newSocket() throws IOException {
+    return (socketWriteTimeout > 0) ? 
+           SocketChannel.open().socket() : new Socket();                                
  
+  }
+  
   private NamespaceInfo handshake() throws IOException {
     NamespaceInfo nsInfo = new NamespaceInfo();
     while (shouldRun) {
@@ -828,7 +839,7 @@
   private static void receiveResponse(Socket s, int numTargets) throws IOException {
     // check the response
     DataInputStream reply = new DataInputStream(new BufferedInputStream(
-        new SocketInputStream(s), BUFFER_SIZE));
+                                NetUtils.getInputStream(s), BUFFER_SIZE));
     try {
       for (int i = 0; i < numTargets; i++) {
         short opStatus = reply.readShort();
@@ -843,9 +854,10 @@
   }
 
   /* utility function for sending a respose */
-  private static void sendResponse(Socket s, short opStatus) throws IOException {
+  private static void sendResponse(Socket s, short opStatus, long timeout) 
+                                                       throws IOException {
     DataOutputStream reply = 
-      new DataOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT));
+      new DataOutputStream(NetUtils.getOutputStream(s, timeout));
     try {
       reply.writeShort(opStatus);
       reply.flush();
@@ -943,7 +955,7 @@
       DataInputStream in=null; 
       try {
         in = new DataInputStream(
-            new BufferedInputStream(new SocketInputStream(s), BUFFER_SIZE));
+            new BufferedInputStream(NetUtils.getInputStream(s), BUFFER_SIZE));
         short version = in.readShort();
         if ( version != DATA_TRANSFER_VERSION ) {
           throw new IOException( "Version Mismatch" );
@@ -1010,9 +1022,9 @@
       long length = in.readLong();
 
       // send the block
-      DataOutputStream out = new DataOutputStream(
-            new BufferedOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT),
-                                     SMALL_BUFFER_SIZE));
+      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+        NetUtils.getOutputStream(s, socketWriteTimeout), SMALL_BUFFER_SIZE));
+      
       BlockSender blockSender = null;
       try {
         try {
@@ -1103,7 +1115,7 @@
 
         // get a connection back to the previous target
         replyOut = new DataOutputStream(
-                       new SocketOutputStream(s, WRITE_TIMEOUT));
+                       NetUtils.getOutputStream(s, socketWriteTimeout));
 
         //
         // Open network conn to backup machine, if 
@@ -1114,19 +1126,19 @@
           // Connect to backup machine
           mirrorNode = targets[0].getName();
           mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
-          mirrorSock = SocketChannel.open().socket();
+          mirrorSock = newSocket();
           try {
             int timeoutValue = numTargets * socketTimeout;
-            int writeTimeout = WRITE_TIMEOUT + 
+            int writeTimeout = socketWriteTimeout + 
                                (WRITE_TIMEOUT_EXTENSION * numTargets);
             mirrorSock.connect(mirrorTarget, timeoutValue);
             mirrorSock.setSoTimeout(timeoutValue);
             mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
             mirrorOut = new DataOutputStream(
                new BufferedOutputStream(
-                           new SocketOutputStream(mirrorSock, writeTimeout),
+                           NetUtils.getOutputStream(mirrorSock, writeTimeout),
                            BUFFER_SIZE));
-            mirrorIn = new DataInputStream(new SocketInputStream(mirrorSock));
+            mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
 
             // Write header: Copied from DFSClient.java!
             mirrorOut.writeShort( DATA_TRANSFER_VERSION );
@@ -1247,7 +1259,8 @@
         byte [] buf = new byte[(int)fileSize];
         IOUtils.readFully(checksumIn, buf, 0, buf.length);
         
-        out = new DataOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT));
+        out = new DataOutputStream(
+                  NetUtils.getOutputStream(s, socketWriteTimeout));
         
         out.writeByte(OP_STATUS_SUCCESS);
         out.writeInt(buf.length);
@@ -1289,13 +1302,13 @@
 
         // get the output stream to the target
         InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
-        targetSock = SocketChannel.open().socket();
+        targetSock = newSocket();
         targetSock.connect(targetAddr, socketTimeout);
         targetSock.setSoTimeout(socketTimeout);
 
         targetOut = new DataOutputStream(new BufferedOutputStream(
-                        new SocketOutputStream(targetSock, WRITE_TIMEOUT),
-                        SMALL_BUFFER_SIZE));
+                      NetUtils.getOutputStream(targetSock, socketWriteTimeout),
+                      SMALL_BUFFER_SIZE));
 
         /* send request to the target */
         // fist write header info
@@ -1322,7 +1335,7 @@
       } finally {
         /* send response to the requester */
         try {
-          sendResponse(s, opStatus);
+          sendResponse(s, opStatus, socketWriteTimeout);
         } catch (IOException replyE) {
           LOG.warn("Error writing the response back to "+
               s.getRemoteSocketAddress() + "\n" +
@@ -1370,7 +1383,7 @@
       } finally {
         // send response back
         try {
-          sendResponse(s, opStatus);
+          sendResponse(s, opStatus, socketWriteTimeout);
         } catch (IOException ioe) {
           LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
         }
@@ -2587,14 +2600,14 @@
       try {
         InetSocketAddress curTarget = 
           NetUtils.createSocketAddr(targets[0].getName());
-        sock = SocketChannel.open().socket();
+        sock = newSocket();
         sock.connect(curTarget, socketTimeout);
         sock.setSoTimeout(targets.length * socketTimeout);
 
-        long writeTimeout = WRITE_TIMEOUT + 
+        long writeTimeout = socketWriteTimeout + 
                             WRITE_TIMEOUT_EXTENSION * (targets.length-1);
         out = new DataOutputStream(new BufferedOutputStream(
-               new SocketOutputStream(sock, writeTimeout), SMALL_BUFFER_SIZE));
+          NetUtils.getOutputStream(sock, writeTimeout), SMALL_BUFFER_SIZE));
 
         blockSender = new BlockSender(b, 0, -1, false, false, false);
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=648875&r1=648874&r2=648875&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Apr 16 15:00:21
2008
@@ -125,7 +125,7 @@
   public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
   public static final long LEASE_HARDLIMIT_PERIOD = 60 * LEASE_SOFTLIMIT_PERIOD;
   public static int READ_TIMEOUT = 60 * 1000;
-  public static int WRITE_TIMEOUT = 10 * 60 * 1000;  
+  public static int WRITE_TIMEOUT = 8 * 60 * 1000;  
   public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
 
   // We need to limit the length and depth of a path in the filesystem.  HADOOP-438



Mime
View raw message