hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r615946 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/DFSClient.java src/java/org/apache/hadoop/dfs/DataNode.java
Date Mon, 28 Jan 2008 16:53:16 GMT
Author: dhruba
Date: Mon Jan 28 08:53:07 2008
New Revision: 615946

URL: http://svn.apache.org/viewvc?rev=615946&view=rev
Log:
HADOOP-2713. TestDatanodeDeath failed on windows because the replication
request was timing out. (dhruba)


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=615946&r1=615945&r2=615946&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jan 28 08:53:07 2008
@@ -594,6 +594,9 @@
     HADOOP-2720. Jumbo bug fix patch to HOD.  Final sync of Apache SVN with
     internal Yahoo SVN.  (Hemanth Yamijala via nigel)
 
+    HADOOP-2713. TestDatanodeDeath failed on windows because the replication
+    request was timing out. (dhruba)
+
 Release 0.15.3 - 2008-01-18
 
   BUG FIXES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=615946&r1=615945&r2=615946&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Jan 28 08:53:07 2008
@@ -2273,10 +2273,10 @@
         }
 
         // wait for threads to exit
+        streamer.join();
         if (response != null) {
           response.join();
         }
-        streamer.join();
         streamer = null;
         blockStream = null;
         blockReplyStream = null;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=615946&r1=615945&r2=615946&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Jan 28 08:53:07 2008
@@ -891,9 +891,14 @@
    */
   class DataXceiver implements Runnable {
     Socket s;
+    String remoteAddress; // address of remote side
+    String localAddress;  // local address of this daemon
     public DataXceiver(Socket s) {
       this.s = s;
       childSockets.put(s, s);
+      InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress();
+      remoteAddress = isock.toString();
+      localAddress = s.getInetAddress() + ":" + s.getLocalPort();
       LOG.debug("Number of active connections is: "+xceiverCount);
     }
 
@@ -1032,7 +1037,9 @@
       // Read in the header
       //
       Block block = new Block(in.readLong(), estimateBlockSize);
-      LOG.info("Receiving block " + block + " from " + s.getInetAddress());
+      LOG.info("Receiving block " + block + 
+               " src: " + remoteAddress +
+               " dest: " + localAddress);
       int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
       boolean isRecovery = in.readBoolean(); // is this part of recovery?
       String client = Text.readString(in); // working on behalf of this client
@@ -1047,7 +1054,6 @@
         targets[i] = tmp;
       }
 
-      short opStatus = OP_STATUS_SUCCESS; // write operation status
       DataOutputStream mirrorOut = null;  // stream to next target
       DataInputStream mirrorIn = null;    // reply from next target
       DataOutputStream replyOut = null;   // stream to prev target
@@ -1096,16 +1102,20 @@
             blockReceiver.writeChecksumHeader(mirrorOut);
             mirrorOut.flush();
 
-            // read connect ack
-            firstBadLink = Text.readString(mirrorIn);
-            LOG.info("Datanode " + targets.length +
-                     " got response for connect ack " +
-                     " from downstream datanode with firstbadlink as " +
-                     firstBadLink);
+            // read connect ack (only for clients, not for replication req)
+            if (client.length() != 0) {
+              firstBadLink = Text.readString(mirrorIn);
+              LOG.info("Datanode " + targets.length +
+                       " got response for connect ack " +
+                       " from downstream datanode with firstbadlink as " +
+                       firstBadLink);
+            }
 
           } catch (IOException e) {
-            Text.writeString(replyOut, mirrorNode);
-            replyOut.flush();
+            if (client.length() != 0) {
+              Text.writeString(replyOut, mirrorNode);
+              replyOut.flush();
+            }
             IOUtils.closeStream(mirrorOut);
             mirrorOut = null;
             IOUtils.closeStream(mirrorIn);
@@ -1116,12 +1126,14 @@
           }
         }
 
-        // send connect ack back to source
-        LOG.info("Datanode " + targets.length +
-                 " forwarding connect ack to upstream firstbadlink is " +
-                 firstBadLink);
-        Text.writeString(replyOut, firstBadLink);
-        replyOut.flush();
+        // send connect ack back to source (only for clients)
+        if (client.length() != 0) {
+          LOG.info("Datanode " + targets.length +
+                   " forwarding connect ack to upstream firstbadlink is " +
+                   firstBadLink);
+          Text.writeString(replyOut, firstBadLink);
+          replyOut.flush();
+        }
 
         // receive the block and mirror to the next target
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
@@ -1133,10 +1145,10 @@
         // the block is finalized in the PacketResponder.
         if (client.length() == 0) {
           notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
-          LOG.info("Received block " + block +
-                   " of size " + block.getNumBytes() +
-                   " from " + s.getInetAddress());
-
+          LOG.info("Received block " + block + 
+                   " src: " + remoteAddress +
+                   " dest: " + localAddress +
+                   " of size " + block.getNumBytes());
         }
 
         if (blockScanner != null) {
@@ -1145,7 +1157,6 @@
         
       } catch (IOException ioe) {
         LOG.info("writeBlock " + block + " received exception " + ioe);
-        opStatus = OP_STATUS_ERROR;
         throw ioe;
       } finally {
         // close all opened streams
@@ -2184,13 +2195,6 @@
         this.mirrorOut = mirrorOut;
         this.mirrorAddr = mirrorAddr;
         this.throttler = throttler;
-
-      /*
-       * We need an estimate for block size to check if the disk partition has
-       * enough space. For now we just increment FSDataset.reserved by
-       * configured dfs.block.size Other alternative is to include the block
-       * size in the header sent by DFSClient.
-       */
 
       try {
         // write data chunk header



Mime
View raw message