hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r638718 - in /hadoop/core/branches/branch-0.16: CHANGES.txt src/java/org/apache/hadoop/dfs/DataNode.java
Date Wed, 19 Mar 2008 06:04:08 GMT
Author: dhruba
Date: Tue Mar 18 23:04:06 2008
New Revision: 638718

URL: http://svn.apache.org/viewvc?rev=638718&view=rev
Log:
HADOOP-3033. The BlockReceiver thread in the datanode writes data to
the block file, changes file position (if needed) and flushes all by
itself. The PacketResponder thread does not flush block file. (dhruba)
svn merge -c 638716 from trunk.


Modified:
    hadoop/core/branches/branch-0.16/CHANGES.txt
    hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java

Modified: hadoop/core/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/CHANGES.txt?rev=638718&r1=638717&r2=638718&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.16/CHANGES.txt Tue Mar 18 23:04:06 2008
@@ -7,6 +7,10 @@
     HADOOP-3011. Prohibit distcp from overwriting directories on the
     destination filesystem with files. (cdouglas)
 
+    HADOOP-3033. The BlockReceiver thread in the datanode writes data to 
+    the block file, changes file position (if needed) and flushes all by
+    itself. The PacketResponder thread does not flush block file. (dhruba)
+
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java?rev=638718&r1=638717&r2=638718&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java Tue Mar
18 23:04:06 2008
@@ -1777,11 +1777,6 @@
                          " from " + receiver.inAddr);
               }
               lastPacket = true;
-            } else {
-              // flush packet to disk before sending ack
-              if (!receiver.finalized) {
-                receiver.flush();
-              }
             }
 
             replyOut.writeLong(expected);
@@ -1830,6 +1825,15 @@
                 LOG.debug("PacketResponder " + numTargets + " got seqno = " + seqno);
                 Packet pkt = null;
                 synchronized (this) {
+                  while (running && shouldRun && ackQueue.size() == 0) {
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug("PacketResponder " + numTargets + 
+                                " seqno = " + seqno +
+                                " for block " + block +
+                                " waiting for local datanode to finish write.");
+                    }
+                    wait();
+                  }
                   pkt = ackQueue.removeFirst();
                   expected = pkt.seqno;
                   notifyAll();
@@ -1866,12 +1870,6 @@
                        " of size " + block.getNumBytes() + 
                        " from " + receiver.inAddr);
             }
-            else if (!lastPacketInBlock) {
-              // flush packet to disk before sending ack
-              if (!receiver.finalized) {
-                receiver.flush();
-              }
-            }
 
             // send my status back to upstream datanode
             replyOut.writeLong(expected); // send seqno upstream
@@ -1969,8 +1967,6 @@
     private FSDataset.BlockWriteStreams streams;
     private boolean isRecovery = false;
     private String clientName;
-    private Object currentWriteLock;
-    volatile private boolean currentWrite;
 
     BlockReceiver(Block block, DataInputStream in, String inAddr,
                   boolean isRecovery, String clientName)
@@ -1982,8 +1978,6 @@
         this.isRecovery = isRecovery;
         this.clientName = clientName;
         this.offsetInBlock = 0;
-        this.currentWriteLock = new Object();
-        this.currentWrite = false;
         this.checksum = DataChecksum.newDataChecksum(in);
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
@@ -2009,18 +2003,6 @@
     // close files
     public void close() throws IOException {
 
-      synchronized (currentWriteLock) {
-        while (currentWrite) {
-          try {
-            LOG.info("BlockReceiver for block " + block +
-                     " waiting for last write to drain.");
-            currentWriteLock.wait();
-          } catch (InterruptedException e) {
-            throw new IOException("BlockReceiver for block " + block +
-                                  " interrupted drain of last io.");
-          }
-        }
-      }
       IOException ioe = null;
       // close checksum file
       try {
@@ -2089,11 +2071,6 @@
       }
 
       checksum.reset();
-
-      // record the fact that the current write is still in progress
-      synchronized (currentWriteLock) {
-        currentWrite = true;
-      }
       offsetInBlock += len;
 
       // First write to remote node before writing locally.
@@ -2113,10 +2090,6 @@
           // recovery.
           //
           if (clientName.length() > 0) {
-            synchronized (currentWriteLock) {
-              currentWrite = false;
-              currentWriteLock.notifyAll();
-            }
             throw ioe;
           }
         }
@@ -2132,11 +2105,6 @@
       } catch (IOException iex) {
         checkDiskError(iex);
         throw iex;
-      } finally {
-        synchronized (currentWriteLock) {
-          currentWrite = false;
-          currentWriteLock.notifyAll();
-        }
       }
 
       if (throttler != null) { // throttle I/O
@@ -2184,12 +2152,6 @@
             throw e;
           }
         }
-        // first enqueue the ack packet to avoid a race with the response coming
-        // from downstream datanode.
-        if (responder != null) {
-          ((PacketResponder)responder.getRunnable()).enqueue(seqno, 
-                                          lastPacketInBlock); 
-        }
       }
 
       if (len == 0) {
@@ -2220,8 +2182,11 @@
         curPacketSize += 4;
       }
 
+      /// flush entire packet before sending ack
+      flush();
+
       // put in queue for pending acks
-      if (responder != null && mirrorOut == null) {
+      if (responder != null) {
         ((PacketResponder)responder.getRunnable()).enqueue(seqno,
                                         lastPacketInBlock); 
       }



Mime
View raw message