hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r583958 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/DataNode.java
Date Thu, 11 Oct 2007 21:27:29 GMT
Author: dhruba
Date: Thu Oct 11 14:27:29 2007
New Revision: 583958

URL: http://svn.apache.org/viewvc?rev=583958&view=rev
Log:
HADOOP-2018. The source datanode of a data transfer waits for
a response from the target datanode before closing the data stream.
(Hairong Kuang via dhruba)


Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=583958&r1=583957&r2=583958&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Oct 11 14:27:29 2007
@@ -276,6 +276,10 @@
     properly i.e. the map TIP is incorrectly left marked as 'complete' and it
     is never rescheduled elsewhere, leading to hung reduces.
     (Devaraj Das via acmurthy)
+
+    HADOOP-2018. The source datanode of a data transfer waits for
+    a response from the target datanode before closing the data stream.
+    (Hairong Kuang via dhruba)
                                 
   IMPROVEMENTS
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=583958&r1=583957&r2=583958&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Oct 11 14:27:29 2007
@@ -692,7 +692,48 @@
       }
     }
   }
-    
+
+  /* utility function for receiving a response */
+  private static void receiveResponse(Socket s) throws IOException {
+    // check the response
+    DataInputStream reply = new DataInputStream(new BufferedInputStream(
+        s.getInputStream(), BUFFER_SIZE));
+    try {
+      short opStatus = reply.readShort();
+      if(opStatus != OP_STATUS_SUCCESS) {
+        throw new IOException("operation failed at "+
+            s.getInetAddress());
+      } 
+    } finally {
+      IOUtils.closeStream(reply);
+    }
+  }
+
+  /* utility function for sending a respose */
+  private static void sendResponse(Socket s, short opStatus) throws IOException {
+    DataOutputStream reply = new DataOutputStream(s.getOutputStream());
+    try {
+      reply.writeShort(opStatus);
+      reply.flush();
+    } finally {
+      IOUtils.closeStream(reply);
+    }
+  }
+
+  /*
+   * Informing the name node could take a long long time! Should we wait
+   * till namenode is informed before responding with success to the
+   * client? For now we don't.
+   */
+  private void notifyNamenodeReceivedBlock(Block block) {
+    synchronized (receivedBlockList) {
+      receivedBlockList.add(block);
+      receivedBlockList.notifyAll();
+    }
+  }
+
+
+
   /**
    * Server used for receiving/sending a block of data.
    * This is created to listen for requests from clients or 
@@ -854,10 +895,11 @@
       DataOutputStream mirrorOut = null;  // stream to next target
       Socket mirrorSock = null;           // socket to next target
       BlockReceiver blockReceiver = null; // responsible for data handling
+      String mirrorNode = null;           // the name:port of next target
       try {
         // open a block receiver and check if the block does not exist
         blockReceiver = new BlockReceiver(block, in, 
-            s.getRemoteSocketAddress().toString());
+            s.getInetAddress().toString());
 
         //
         // Open network conn to backup machine, if 
@@ -865,7 +907,6 @@
         //
         if (targets.length > 0) {
           InetSocketAddress mirrorTarget = null;
-          String mirrorNode = null;
           // Connect to backup machine
           mirrorNode = targets[0].getName();
           mirrorTarget = createSocketAddr(mirrorNode);
@@ -892,40 +933,25 @@
           }
         }
 
-        String mirrorAddr = (mirrorSock == null) ? null : 
-            mirrorSock.getRemoteSocketAddress().toString();
+        // receive the block and mirror to the next target
+        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null);
 
-        /*
-         * Informing the name node could take a long long time! Should we wait
-         * till namenode is informed before responding with success to the
-         * client? For now we don't.
-         */
-        synchronized (receivedBlockList) {
-          receivedBlockList.add(block);
-          receivedBlockList.notifyAll();
-        }
+        // notify name node
+        notifyNamenodeReceivedBlock(block);
 
         String msg = "Received block " + block + " from " +
-                     s.getRemoteSocketAddress();
+                     s.getInetAddress();
 
         /* read response from next target in the pipeline. 
          * ignore the response for now. Will fix it in HADOOP-1927
          */
         if( mirrorSock != null ) {
-          short result = OP_STATUS_ERROR;
-          DataInputStream mirrorIn = null;
           try {
-            mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
-            result = mirrorIn.readShort();
+            receiveResponse(mirrorSock);
           } catch (IOException ignored) {
-          } finally {
-            IOUtils.closeStream(mirrorIn);
-          }
-
-          msg += " and " +  (( result != OP_STATUS_SUCCESS ) ?
-                               "failed to mirror to " : " mirrored to ") +
-                 mirrorAddr;
+            msg += " and " +  ignored.getMessage();
+          } 
         }
 
         LOG.info(msg);
@@ -934,17 +960,14 @@
         throw ioe;
       } finally {
         // send back reply
-        DataOutputStream reply = new DataOutputStream(s.getOutputStream());
         try {
-          reply.writeShort(opStatus);
-          reply.flush();
+          sendResponse(s, opStatus);
         } catch (IOException ioe) {
-          LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()
-              + "for writing block " + block );
-          LOG.warn(StringUtils.stringifyException(ioe));
+          LOG.warn("Error writing reply back to " + s.getInetAddress() +
+              " for writing block " + block +"\n" +
+              StringUtils.stringifyException(ioe));
         }
         // close all opened streams
-        IOUtils.closeStream(reply);
         IOUtils.closeStream(mirrorOut);
         IOUtils.closeSocket(mirrorSock);
         IOUtils.closeStream(blockReceiver);
@@ -1435,6 +1458,10 @@
         }
         // send data & checksum
         blockSender.sendBlock(out, null);
+        
+        // check the response
+        receiveResponse(sock);
+
         LOG.info("Transmitted block " + b + " to " + curTarget);
       } catch (IOException ie) {
         LOG.warn("Failed to transfer " + b + " to " + targets[0].getName()



Mime
View raw message