hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r781816 - in /hadoop/core/trunk: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Date Thu, 04 Jun 2009 18:11:47 GMT
Author: szetszwo
Date: Thu Jun  4 18:11:46 2009
New Revision: 781816

URL: http://svn.apache.org/viewvc?rev=781816&view=rev
Log:
HADOOP-5859. Fix "wait() or sleep() with locks held" findbugs warnings in DFSClient.  Contributed by Kan Zhang

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=781816&r1=781815&r2=781816&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun  4 18:11:46 2009
@@ -783,6 +783,9 @@
     HADOOP-5762. Fix a problem that DistCp does not copy empty directory.
     (Rodrigo Schmidt via szetszwo)
 
+    HADOOP-5859. Fix "wait() or sleep() with locks held" findbugs warnings in
+    DFSClient.  (Kan Zhang via szetszwo)
+
 Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=781816&r1=781815&r2=781816&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Thu Jun  4 18:11:46 2009
@@ -45,6 +45,7 @@
 import java.net.*;
 import java.util.*;
 import java.util.zip.CRC32;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentHashMap;
 import java.nio.BufferOverflowException;
@@ -2088,46 +2089,31 @@
    * starts sending packets from the dataQueue.
   ****************************************************************/
   class DFSOutputStream extends FSOutputSummer implements Syncable {
+    private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
     private Socket s;
     // closed is accessed by different threads under different locks.
-    volatile boolean closed = false;
+    private volatile boolean closed = false;
   
     private String src;
-    private DataOutputStream blockStream;
-    private DataInputStream blockReplyStream;
-    private Block block;
-    private AccessToken accessToken;
-    final private long blockSize;
-    private DataChecksum checksum;
-    private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
-    private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
+    private final long blockSize;
+    private final DataChecksum checksum;
+    // both dataQueue and ackQueue are protected by dataQueue lock
+    private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
+    private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
     private Packet currentPacket = null;
-    private int maxPackets = 80; // each packet 64K, total 5MB
-    // private int maxPackets = 1000; // each packet 64K, total 64MB
-    private DataStreamer streamer = new DataStreamer();;
-    private ResponseProcessor response = null;
+    private DataStreamer streamer = new DataStreamer();
     private long currentSeqno = 0;
     private long bytesCurBlock = 0; // bytes writen in current block
     private int packetSize = 0; // write packet size, including the header.
     private int chunksPerPacket = 0;
-    private DatanodeInfo[] nodes = null; // list of targets for current block
-    private volatile boolean hasError = false;
-    private volatile int errorIndex = 0;
     private volatile IOException lastException = null;
     private long artificialSlowdown = 0;
     private long lastFlushOffset = -1; // offset when flush was invoked
-    //volatile: written holding dataQueue and read holding DFSOutputStream 
-    private volatile boolean persistBlocks = false;//persist blocks on namenode
-    private int recoveryErrorCount = 0; // number of times block recovery failed
-    private int maxRecoveryErrorCount = 5; // try block recovery 5 times
+    //persist blocks on namenode
+    private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
     private volatile boolean appendChunk = false;   // appending to existing partial block
     private long initialFileSize = 0; // at time of file open
-
-    private void setLastException(IOException e) {
-      if (lastException == null) {
-        lastException = e;
-      }
-    }
+    private Progressable progress;
     
     private class Packet {
       ByteBuffer buffer;           // only one of buf and buffer is non-null
@@ -2233,11 +2219,24 @@
     // if them are received, the DataStreamer closes the current block.
     //
     private class DataStreamer extends Daemon {
-
-      private volatile boolean closed = false;
-  
+      private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
+      private int recoveryErrorCount = 0; // number of times block recovery failed
+      private volatile boolean streamerClosed = false;
+      private Block block;
+      private AccessToken accessToken;
+      private DataOutputStream blockStream;
+      private DataInputStream blockReplyStream;
+      private ResponseProcessor response = null;
+      private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+      private volatile boolean hasError = false;
+      private volatile int errorIndex = 0;
+  
+      /*
+       * streamer thread is the only thread that opens streams to datanode, 
+       * and closes them. Any error recovery is also done by this thread.
+       */
       public void run() {
-        while (!closed && clientRunning) {
+        while (!streamerClosed && clientRunning) {
 
           // if the Responder encountered an error, shutdown Responder
           if (hasError && response != null) {
@@ -2250,120 +2249,104 @@
           }
 
           Packet one = null;
-          synchronized (dataQueue) {
 
-            // process IO errors if any
-            boolean doSleep = processDatanodeError(hasError, false);
+          // process IO errors if any
+          boolean doSleep = processDatanodeError(hasError, false);
 
+          synchronized (dataQueue) {
             // wait for a packet to be sent.
-            while ((!closed && !hasError && clientRunning 
-                   && dataQueue.size() == 0) || doSleep) {
+            while ((!streamerClosed && !hasError && clientRunning 
+                && dataQueue.size() == 0) || doSleep) {
               try {
                 dataQueue.wait(1000);
               } catch (InterruptedException  e) {
               }
               doSleep = false;
             }
-            if (closed || hasError || dataQueue.size() == 0 || !clientRunning) {
+            if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
               continue;
             }
+            // get packet to be sent.
+            one = dataQueue.getFirst();
+          }
 
-            try {
-              // get packet to be sent.
-              one = dataQueue.getFirst();
-              long offsetInBlock = one.offsetInBlock;
-  
-              // get new block from namenode.
-              if (blockStream == null) {
-                LOG.debug("Allocating new block");
-                nodes = nextBlockOutputStream(src); 
-                this.setName("DataStreamer for file " + src +
-                             " block " + block);
-                response = new ResponseProcessor(nodes);
-                response.start();
-              }
+          try {
+            long offsetInBlock = one.offsetInBlock;
 
-              if (offsetInBlock >= blockSize) {
-                throw new IOException("BlockSize " + blockSize +
-                                      " is smaller than data size. " +
-                                      " Offset of packet in block " + 
-                                      offsetInBlock +
-                                      " Aborting file " + src);
-              }
+            // get new block from namenode.
+            if (blockStream == null) {
+              LOG.debug("Allocating new block");
+              nodes = nextBlockOutputStream(src); 
+              this.setName("DataStreamer for file " + src +
+                  " block " + block);
+              response = new ResponseProcessor(nodes);
+              response.start();
+            }
+
+            if (offsetInBlock >= blockSize) {
+              throw new IOException("BlockSize " + blockSize +
+                  " is smaller than data size. " +
+                  " Offset of packet in block " + 
+                  offsetInBlock +
+                  " Aborting file " + src);
+            }
 
-              ByteBuffer buf = one.getBuffer();
-              
+            ByteBuffer buf = one.getBuffer();
+
+            synchronized (dataQueue) {
               // move packet from dataQueue to ackQueue
               dataQueue.removeFirst();
+              ackQueue.addLast(one);
               dataQueue.notifyAll();
-              synchronized (ackQueue) {
-                ackQueue.addLast(one);
-                ackQueue.notifyAll();
-              } 
-              
-              // write out data to remote datanode
-              blockStream.write(buf.array(), buf.position(), buf.remaining());
-              
-              if (one.lastPacketInBlock) {
-                blockStream.writeInt(0); // indicate end-of-block 
-              }
-              blockStream.flush();
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("DataStreamer block " + block +
-                          " wrote packet seqno:" + one.seqno +
-                          " size:" + buf.remaining() +
-                          " offsetInBlock:" + one.offsetInBlock + 
-                          " lastPacketInBlock:" + one.lastPacketInBlock);
-              }
-            } catch (Throwable e) {
-              LOG.warn("DataStreamer Exception: " + 
-                       StringUtils.stringifyException(e));
-              if (e instanceof IOException) {
-                setLastException((IOException)e);
-              }
-              hasError = true;
             }
+
+            // write out data to remote datanode
+            blockStream.write(buf.array(), buf.position(), buf.remaining());
+
+            if (one.lastPacketInBlock) {
+              blockStream.writeInt(0); // indicate end-of-block 
+            }
+            blockStream.flush();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("DataStreamer block " + block +
+                  " wrote packet seqno:" + one.seqno +
+                  " size:" + buf.remaining() +
+                  " offsetInBlock:" + one.offsetInBlock + 
+                  " lastPacketInBlock:" + one.lastPacketInBlock);
+            }
+          } catch (Throwable e) {
+            LOG.warn("DataStreamer Exception: " + 
+                StringUtils.stringifyException(e));
+            if (e instanceof IOException) {
+              setLastException((IOException)e);
+            }
+            hasError = true;
           }
 
-          if (closed || hasError || !clientRunning) {
+
+          if (streamerClosed || hasError || !clientRunning) {
             continue;
           }
 
           // Is this block full?
           if (one.lastPacketInBlock) {
-            synchronized (ackQueue) {
-              while (!hasError && ackQueue.size() != 0 && clientRunning) {
+            synchronized (dataQueue) {
+              while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
                 try {
-                  ackQueue.wait();   // wait for acks to arrive from datanodes
+                  dataQueue.wait(1000);   // wait for acks to arrive from datanodes
                 } catch (InterruptedException  e) {
                 }
               }
             }
-            LOG.debug("Closing old block " + block);
-            this.setName("DataStreamer for file " + src);
-
-            response.close();        // ignore all errors in Response
-            try {
-              response.join();
-              response = null;
-            } catch (InterruptedException  e) {
-            }
-
-            if (closed || hasError || !clientRunning) {
+            if (streamerClosed || hasError || !clientRunning) {
               continue;
             }
 
-            synchronized (dataQueue) {
-              try {
-                blockStream.close();
-                blockReplyStream.close();
-              } catch (IOException e) {
-              }
-              nodes = null;
-              response = null;
-              blockStream = null;
-              blockReplyStream = null;
-            }
+            LOG.debug("Closing old block " + block);
+            this.setName("DataStreamer for file " + src);
+            closeResponder();
+            closeStream();
+            nodes = null;
           }
           if (progress != null) { progress.progress(); }
 
@@ -2374,288 +2357,545 @@
             } catch (InterruptedException e) {}
           }
         }
+        closeInternal();
       }
 
-      // shutdown thread
-      void close() {
+      private void closeInternal() {
+        closeResponder();
+        closeStream();
+        streamerClosed = true;
         closed = true;
         synchronized (dataQueue) {
           dataQueue.notifyAll();
         }
-        synchronized (ackQueue) {
-          ackQueue.notifyAll();
-        }
+      }
+
+      /*
+       * close both streamer and DFSOutputStream, should be called only 
+       * by an external thread and only after all data to be sent has 
+       * been flushed to datanode.
+       */
+      void close() {
+        streamerClosed = true;
         this.interrupt();
       }
-    }
-                  
-    //
-    // Processes reponses from the datanodes.  A packet is removed 
-    // from the ackQueue when its response arrives.
-    //
-    private class ResponseProcessor extends Thread {
 
-      private volatile boolean closed = false;
-      private DatanodeInfo[] targets = null;
-      private boolean lastPacketInBlock = false;
+      private void closeResponder() {
+        if (response != null) {
+          try {
+            response.close();
+            response.join();
+          } catch (InterruptedException  e) {
+          } finally {
+            response = null;
+          }
+        }
+      }
 
-      ResponseProcessor (DatanodeInfo[] targets) {
-        this.targets = targets;
+      private void closeStream() {
+        if (blockStream != null) {
+          try {
+            blockStream.close();
+          } catch (IOException e) {
+          } finally {
+            blockStream = null;
+          }
+        }
+        if (blockReplyStream != null) {
+          try {
+            blockReplyStream.close();
+          } catch (IOException e) {
+          } finally {
+            blockReplyStream = null;
+          }
+        }
       }
 
-      public void run() {
+      //
+      // Processes reponses from the datanodes.  A packet is removed 
+      // from the ackQueue when its response arrives.
+      //
+      private class ResponseProcessor extends Daemon {
 
-        this.setName("ResponseProcessor for block " + block);
-  
-        while (!closed && clientRunning && !lastPacketInBlock) {
-          // process responses from datanodes.
-          try {
-            // verify seqno from datanode
-            long seqno = blockReplyStream.readLong();
-            LOG.debug("DFSClient received ack for seqno " + seqno);
-            if (seqno == -1) {
-              continue;
-            } else if (seqno == -2) {
-              // no nothing
-            } else {
-              Packet one = null;
-              synchronized (ackQueue) {
-                one = ackQueue.getFirst();
+        private volatile boolean responderClosed = false;
+        private DatanodeInfo[] targets = null;
+        private boolean isLastPacketInBlock = false;
+
+        ResponseProcessor (DatanodeInfo[] targets) {
+          this.targets = targets;
+        }
+
+        public void run() {
+
+          this.setName("ResponseProcessor for block " + block);
+
+          while (!responderClosed && clientRunning && !isLastPacketInBlock) {
+            // process responses from datanodes.
+            try {
+              // verify seqno from datanode
+              long seqno = blockReplyStream.readLong();
+              LOG.debug("DFSClient received ack for seqno " + seqno);
+              if (seqno == -1) {
+                continue;
+              } else if (seqno == -2) {
+                // no nothing
+              } else {
+                Packet one = null;
+                synchronized (dataQueue) {
+                  one = ackQueue.getFirst();
+                }
+                if (one.seqno != seqno) {
+                  throw new IOException("Responseprocessor: Expecting seqno " + 
+                      " for block " + block +
+                      one.seqno + " but received " + seqno);
+                }
+                isLastPacketInBlock = one.lastPacketInBlock;
               }
-              if (one.seqno != seqno) {
-                throw new IOException("Responseprocessor: Expecting seqno " + 
-                                      " for block " + block +
-                                      one.seqno + " but received " + seqno);
+
+              // processes response status from all datanodes.
+              for (int i = 0; i < targets.length && clientRunning; i++) {
+                short reply = blockReplyStream.readShort();
+                if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
+                  errorIndex = i; // first bad datanode
+                  throw new IOException("Bad response " + reply +
+                      " for block " + block +
+                      " from datanode " + 
+                      targets[i].getName());
+                }
               }
-              lastPacketInBlock = one.lastPacketInBlock;
-            }
 
-            // processes response status from all datanodes.
-            for (int i = 0; i < targets.length && clientRunning; i++) {
-              short reply = blockReplyStream.readShort();
-              if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
-                errorIndex = i; // first bad datanode
-                throw new IOException("Bad response " + reply +
-                                      " for block " + block +
-                                      " from datanode " + 
-                                      targets[i].getName());
+              synchronized (dataQueue) {
+                ackQueue.removeFirst();
+                dataQueue.notifyAll();
+              }
+            } catch (Exception e) {
+              if (!responderClosed) {
+                if (e instanceof IOException) {
+                  setLastException((IOException)e);
+                }
+                hasError = true;
+                synchronized (dataQueue) {
+                  dataQueue.notifyAll();
+                }
+                LOG.warn("DFSOutputStream ResponseProcessor exception " + 
+                    " for block " + block +
+                    StringUtils.stringifyException(e));
+                responderClosed = true;
               }
             }
+          }
+        }
 
-            synchronized (ackQueue) {
-              ackQueue.removeFirst();
-              ackQueue.notifyAll();
-            }
-          } catch (Exception e) {
-            if (!closed) {
-              hasError = true;
-              if (e instanceof IOException) {
-                setLastException((IOException)e);
-              }
-              LOG.warn("DFSOutputStream ResponseProcessor exception " + 
-                       " for block " + block +
-                        StringUtils.stringifyException(e));
-              closed = true;
+        void close() {
+          responderClosed = true;
+          this.interrupt();
+        }
+      }
+
+      // If this stream has encountered any errors so far, shutdown 
+      // threads and mark stream as closed. Returns true if we should
+      // sleep for a while after returning from this call.
+      //
+      private boolean processDatanodeError(boolean error, boolean isAppend) {
+        if (!error) {
+          return false;
+        }
+        if (response != null) {
+          LOG.info("Error Recovery for block " + block +
+          " waiting for responder to exit. ");
+          return true;
+        }
+        if (errorIndex >= 0) {
+          LOG.warn("Error Recovery for block " + block
+              + " bad datanode[" + errorIndex + "] "
+              + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
+        }
+
+        closeStream();
+
+        // move packets from ack queue to front of the data queue
+        synchronized (dataQueue) {
+          dataQueue.addAll(0, ackQueue);
+          ackQueue.clear();
+        }
+
+        boolean success = false;
+        while (!success && !streamerClosed && clientRunning) {
+          DatanodeInfo[] newnodes = null;
+          if (nodes == null) {
+            String msg = "Could not get block locations. " + "Source file \""
+                + src + "\" - Aborting...";
+            LOG.warn(msg);
+            setLastException(new IOException(msg));
+            streamerClosed = true;
+            return false;
+          }
+          StringBuilder pipelineMsg = new StringBuilder();
+          for (int j = 0; j < nodes.length; j++) {
+            pipelineMsg.append(nodes[j].getName());
+            if (j < nodes.length - 1) {
+              pipelineMsg.append(", ");
             }
           }
+          // remove bad datanode from list of datanodes.
+          // If errorIndex was not set (i.e. appends), then do not remove 
+          // any datanodes
+          // 
+          if (errorIndex < 0) {
+            newnodes = nodes;
+          } else {
+            if (nodes.length <= 1) {
+              lastException = new IOException("All datanodes " + pipelineMsg
+                  + " are bad. Aborting...");
+              streamerClosed = true;
+              return false;
+            }
+            LOG.warn("Error Recovery for block " + block +
+                " in pipeline " + pipelineMsg + 
+                ": bad datanode " + nodes[errorIndex].getName());
+            newnodes =  new DatanodeInfo[nodes.length-1];
+            System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
+            System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
+                newnodes.length-errorIndex);
+          }
 
-          synchronized (dataQueue) {
-            dataQueue.notifyAll();
+          // Tell the primary datanode to do error recovery 
+          // by stamping appropriate generation stamps.
+          //
+          LocatedBlock newBlock = null;
+          ClientDatanodeProtocol primary =  null;
+          DatanodeInfo primaryNode = null;
+          try {
+            // Pick the "least" datanode as the primary datanode to avoid deadlock.
+            primaryNode = Collections.min(Arrays.asList(newnodes));
+            primary = createClientDatanodeProtocolProxy(primaryNode, conf);
+            newBlock = primary.recoverBlock(block, isAppend, newnodes);
+          } catch (IOException e) {
+            recoveryErrorCount++;
+            if (recoveryErrorCount > MAX_RECOVERY_ERROR_COUNT) {
+              if (nodes.length > 1) {
+                // if the primary datanode failed, remove it from the list.
+                // The original bad datanode is left in the list because it is
+                // conservative to remove only one datanode in one iteration.
+                for (int j = 0; j < nodes.length; j++) {
+                  if (nodes[j].equals(primaryNode)) {
+                    errorIndex = j; // forget original bad node.
+                  }
+                }
+                // remove primary node from list
+                newnodes =  new DatanodeInfo[nodes.length-1];
+                System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
+                System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
+                    newnodes.length-errorIndex);
+                nodes = newnodes;
+                LOG.warn("Error Recovery for block " + block + " failed "
+                    + " because recovery from primary datanode " + primaryNode
+                    + " failed " + recoveryErrorCount + " times. "
+                    + " Pipeline was " + pipelineMsg
+                    + ". Marking primary datanode as bad.");
+                recoveryErrorCount = 0; 
+                errorIndex = -1;
+                return true;          // sleep when we return from here
+              }
+              String emsg = "Error Recovery for block " + block + " failed "
+                  + " because recovery from primary datanode " + primaryNode
+                  + " failed " + recoveryErrorCount + " times. "
+                  + " Pipeline was " + pipelineMsg + ". Aborting...";
+              LOG.warn(emsg);
+              lastException = new IOException(emsg);
+              streamerClosed = true;
+              return false;       // abort with IOexception
+            } 
+            LOG.warn("Error Recovery for block " + block + " failed "
+                + " because recovery from primary datanode " + primaryNode
+                + " failed " + recoveryErrorCount + " times. "
+                + " Pipeline was " + pipelineMsg + ". Will retry...");
+            return true;          // sleep when we return from here
+          } finally {
+            RPC.stopProxy(primary);
           }
-          synchronized (ackQueue) {
-            ackQueue.notifyAll();
+          recoveryErrorCount = 0; // block recovery successful
+
+          // If the block recovery generated a new generation stamp, use that
+          // from now on.  Also, setup new pipeline
+          //
+          if (newBlock != null) {
+            block = newBlock.getBlock();
+            accessToken = newBlock.getAccessToken();
+            nodes = newBlock.getLocations();
           }
+
+          this.hasError = false;
+          lastException = null;
+          errorIndex = 0;
+          success = createBlockOutputStream(nodes, clientName, true);
         }
-      }
 
-      void close() {
-        closed = true;
-        this.interrupt();
+        if (!streamerClosed && clientRunning) {
+          response = new ResponseProcessor(nodes);
+          response.start();
+        }
+        return false; // do not sleep, continue processing
       }
-    }
 
-    // If this stream has encountered any errors so far, shutdown 
-    // threads and mark stream as closed. Returns true if we should
-    // sleep for a while after returning from this call.
-    //
-    private boolean processDatanodeError(boolean hasError, boolean isAppend) {
-      if (!hasError) {
-        return false;
-      }
-      if (response != null) {
-        LOG.info("Error Recovery for block " + block +
-                 " waiting for responder to exit. ");
-        return true;
-      }
-      if (errorIndex >= 0) {
-        LOG.warn("Error Recovery for block " + block
-            + " bad datanode[" + errorIndex + "] "
-            + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
-      }
+      /**
+       * Open a DataOutputStream to a DataNode so that it can be written to.
+       * This happens when a file is created and each time a new block is allocated.
+       * Must get block ID and the IDs of the destinations from the namenode.
+       * Returns the list of target datanodes.
+       */
+      private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
+        LocatedBlock lb = null;
+        boolean retry = false;
+        DatanodeInfo[] nodes = null;
+        int count = conf.getInt("dfs.client.block.write.retries", 3);
+        boolean success = false;
+        do {
+          hasError = false;
+          lastException = null;
+          errorIndex = 0;
+          retry = false;
+          success = false;
+
+          long startTime = System.currentTimeMillis();
+          lb = locateFollowingBlock(startTime);
+          block = lb.getBlock();
+          accessToken = lb.getAccessToken();
+          nodes = lb.getLocations();
 
-      if (blockStream != null) {
-        try {
-          blockStream.close();
-          blockReplyStream.close();
-        } catch (IOException e) {
+          //
+          // Connect to first DataNode in the list.
+          //
+          success = createBlockOutputStream(nodes, clientName, false);
+
+          if (!success) {
+            LOG.info("Abandoning block " + block);
+            namenode.abandonBlock(block, src, clientName);
+
+            // Connection failed.  Let's wait a little bit and retry
+            retry = true;
+            try {
+              if (System.currentTimeMillis() - startTime > 5000) {
+                LOG.info("Waiting to find target node: " + nodes[0].getName());
+              }
+              Thread.sleep(6000);
+            } catch (InterruptedException iex) {
+            }
+          }
+        } while (retry && --count >= 0);
+
+        if (!success) {
+          throw new IOException("Unable to create new block.");
         }
+        return nodes;
       }
-      blockStream = null;
-      blockReplyStream = null;
 
-      // move packets from ack queue to front of the data queue
-      synchronized (ackQueue) {
-        dataQueue.addAll(0, ackQueue);
-        ackQueue.clear();
-      }
-
-      boolean success = false;
-      while (!success && clientRunning) {
-        DatanodeInfo[] newnodes = null;
-        if (nodes == null) {
-          String msg = "Could not get block locations. " +
-                                          "Source file \"" + src
-                                          + "\" - Aborting...";
-          LOG.warn(msg);
-          setLastException(new IOException(msg));
-          closed = true;
-          if (streamer != null) streamer.close();
-          return false;
-        }
-        StringBuilder pipelineMsg = new StringBuilder();
-        for (int j = 0; j < nodes.length; j++) {
-          pipelineMsg.append(nodes[j].getName());
-          if (j < nodes.length - 1) {
-            pipelineMsg.append(", ");
+      // connects to the first datanode in the pipeline
+      // Returns true if success, otherwise return failure.
+      //
+      private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
+          boolean recoveryFlag) {
+        String firstBadLink = "";
+        if (LOG.isDebugEnabled()) {
+          for (int i = 0; i < nodes.length; i++) {
+            LOG.debug("pipeline = " + nodes[i].getName());
           }
         }
-        // remove bad datanode from list of datanodes.
-        // If errorIndex was not set (i.e. appends), then do not remove 
-        // any datanodes
-        // 
-        if (errorIndex < 0) {
-          newnodes = nodes;
-        } else {
-          if (nodes.length <= 1) {
-            lastException = new IOException("All datanodes " + pipelineMsg + 
-                                            " are bad. Aborting...");
-            closed = true;
-            if (streamer != null) streamer.close();
-            return false;
+
+        // persist blocks on namenode on next flush
+        persistBlocks.set(true);
+
+        try {
+          LOG.debug("Connecting to " + nodes[0].getName());
+          InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
+          s = socketFactory.createSocket();
+          int timeoutValue = (socketTimeout > 0) ? (3000 * nodes.length + socketTimeout) : 0;
+          NetUtils.connect(s, target, timeoutValue);
+          s.setSoTimeout(timeoutValue);
+          s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
+          LOG.debug("Send buf size " + s.getSendBufferSize());
+          long writeTimeout = (datanodeWriteTimeout > 0) ? 
+              (HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
+                  datanodeWriteTimeout) : 0;
+
+          //
+          // Xmit header info to datanode
+          //
+          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+              NetUtils.getOutputStream(s, writeTimeout),
+              DataNode.SMALL_BUFFER_SIZE));
+          blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
+
+          out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+          out.write(DataTransferProtocol.OP_WRITE_BLOCK);
+          out.writeLong(block.getBlockId());
+          out.writeLong(block.getGenerationStamp());
+          out.writeInt(nodes.length);
+          out.writeBoolean(recoveryFlag); // recovery flag
+          Text.writeString(out, client);
+          out.writeBoolean(false); // Not sending src node information
+          out.writeInt(nodes.length - 1);
+          for (int i = 1; i < nodes.length; i++) {
+            nodes[i].write(out);
           }
-          LOG.warn("Error Recovery for block " + block +
-                   " in pipeline " + pipelineMsg + 
-                   ": bad datanode " + nodes[errorIndex].getName());
-          newnodes =  new DatanodeInfo[nodes.length-1];
-          System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
-          System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
-              newnodes.length-errorIndex);
+          accessToken.write(out);
+          checksum.writeHeader(out);
+          out.flush();
+
+          // receive ack for connect
+          firstBadLink = Text.readString(blockReplyStream);
+          if (firstBadLink.length() != 0) {
+            throw new IOException("Bad connect ack with firstBadLink "
+                + firstBadLink);
+          }
+
+          blockStream = out;
+          return true; // success
+
+        } catch (IOException ie) {
+
+          LOG.info("Exception in createBlockOutputStream " + ie);
+
+          // find the datanode that matches
+          if (firstBadLink.length() != 0) {
+            for (int i = 0; i < nodes.length; i++) {
+              if (nodes[i].getName().equals(firstBadLink)) {
+                errorIndex = i;
+                break;
+              }
+            }
+          }
+          hasError = true;
+          setLastException(ie);
+          blockReplyStream = null;
+          return false;  // error
         }
+      }
 
-        // Tell the primary datanode to do error recovery 
-        // by stamping appropriate generation stamps.
-        //
-        LocatedBlock newBlock = null;
-        ClientDatanodeProtocol primary =  null;
-        DatanodeInfo primaryNode = null;
-        try {
-          // Pick the "least" datanode as the primary datanode to avoid deadlock.
-          primaryNode = Collections.min(Arrays.asList(newnodes));
-          primary = createClientDatanodeProtocolProxy(primaryNode, conf);
-          newBlock = primary.recoverBlock(block, isAppend, newnodes);
-        } catch (IOException e) {
-          recoveryErrorCount++;
-          if (recoveryErrorCount > maxRecoveryErrorCount) {
-            if (nodes.length > 1) {
-              // if the primary datanode failed, remove it from the list.
-              // The original bad datanode is left in the list because it is
-              // conservative to remove only one datanode in one iteration.
-              for (int j = 0; j < nodes.length; j++) {
-                if (nodes[j].equals(primaryNode)) {
-                  errorIndex = j; // forget original bad node.
-                }
+      private LocatedBlock locateFollowingBlock(long start) throws IOException {
+        int retries = 5;
+        long sleeptime = 400;
+        while (true) {
+          long localstart = System.currentTimeMillis();
+          while (true) {
+            try {
+              return namenode.addBlock(src, clientName);
+            } catch (RemoteException e) {
+              IOException ue = 
+                e.unwrapRemoteException(FileNotFoundException.class,
+                    AccessControlException.class,
+                    QuotaExceededException.class);
+              if (ue != e) { 
+                throw ue; // no need to retry these exceptions
               }
-              // remove primary node from list
-              newnodes =  new DatanodeInfo[nodes.length-1];
-              System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
-              System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
-                               newnodes.length-errorIndex);
-              nodes = newnodes;
-              LOG.warn("Error Recovery for block " + block + " failed " +
-                       " because recovery from primary datanode " +
-                       primaryNode + " failed " + recoveryErrorCount +
-                       " times. " + " Pipeline was " + pipelineMsg +
-                       ". Marking primary datanode as bad.");
-              recoveryErrorCount = 0; 
-              errorIndex = -1;
-              return true;          // sleep when we return from here
-            }
-            String emsg = "Error Recovery for block " + block + " failed " +
-                          " because recovery from primary datanode " +
-                          primaryNode + " failed " + recoveryErrorCount + 
-                          " times. "  + " Pipeline was " + pipelineMsg +
-                          ". Aborting...";
-            LOG.warn(emsg);
-            lastException = new IOException(emsg);
-            closed = true;
-            if (streamer != null) streamer.close();
-            return false;       // abort with IOexception
-          } 
-          LOG.warn("Error Recovery for block " + block + " failed " +
-                   " because recovery from primary datanode " +
-                   primaryNode + " failed " + recoveryErrorCount +
-                   " times. "  + " Pipeline was " + pipelineMsg +
-                   ". Will retry...");
-          return true;          // sleep when we return from here
-        } finally {
-          RPC.stopProxy(primary);
+
+              if (--retries == 0 && 
+                  !NotReplicatedYetException.class.getName().
+                  equals(e.getClassName())) {
+                throw e;
+              } else {
+                LOG.info(StringUtils.stringifyException(e));
+                if (System.currentTimeMillis() - localstart > 5000) {
+                  LOG.info("Waiting for replication for "
+                      + (System.currentTimeMillis() - localstart) / 1000
+                      + " seconds");
+                }
+                try {
+                  LOG.warn("NotReplicatedYetException sleeping " + src
+                      + " retries left " + retries);
+                  Thread.sleep(sleeptime);
+                  sleeptime *= 2;
+                } catch (InterruptedException ie) {
+                }
+              }                
+            }
+          }
+        } 
+      }
+
+      void initAppend(LocatedBlock lastBlock, FileStatus stat,
+          int bytesPerChecksum) throws IOException {
+        block = lastBlock.getBlock();
+        long usedInLastBlock = stat.getLen() % blockSize;
+        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+        // calculate the amount of free space in the pre-existing 
+        // last crc chunk
+        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+        int freeInCksum = bytesPerChecksum - usedInCksum;
+
+        // if there is space in the last block, then we have to 
+        // append to that block
+        if (freeInLastBlock > blockSize) {
+          throw new IOException("The last block for file " + 
+              src + " is full.");
         }
-        recoveryErrorCount = 0; // block recovery successful
 
-        // If the block recovery generated a new generation stamp, use that
-        // from now on.  Also, setup new pipeline
-        //
-        if (newBlock != null) {
-          block = newBlock.getBlock();
-          accessToken = newBlock.getAccessToken();
-          nodes = newBlock.getLocations();
+        if (usedInCksum > 0 && freeInCksum > 0) {
+          // if there is space in the last partial chunk, then 
+          // setup in such a way that the next packet will have only 
+          // one chunk that fills up the partial chunk.
+          //
+          computePacketChunkSize(0, freeInCksum);
+          resetChecksumChunk(freeInCksum);
+          appendChunk = true;
+        } else {
+          // if the remaining space in the block is smaller than 
+          // that expected size of of a packet, then create 
+          // smaller size packet.
+          //
+          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
+              bytesPerChecksum);
+        }
+
+        // setup pipeline to append to the last block XXX retries??
+        nodes = lastBlock.getLocations();
+        errorIndex = -1;   // no errors yet.
+        if (nodes.length < 1) {
+          throw new IOException("Unable to retrieve blocks locations " +
+              " for last block " + block +
+              "of file " + src);
+
         }
+        processDatanodeError(true, true);
+      }
 
-        this.hasError = false;
-        lastException = null;
-        errorIndex = 0;
-        success = createBlockOutputStream(nodes, clientName, true);
+      DatanodeInfo[] getNodes() {
+        return nodes;
       }
 
-      response = new ResponseProcessor(nodes);
-      response.start();
-      return false; // do not sleep, continue processing
+      private void setLastException(IOException e) {
+        if (lastException == null) {
+          lastException = e;
+        }
+      }
     }
 
     private void isClosed() throws IOException {
-      if (closed && lastException != null) {
-          throw lastException;
+      if (closed) {
+        IOException e = lastException;
+        throw e != null ? e : new IOException("DFSOutputStream is closed");
       }
     }
 
     //
     // returns the list of targets, if any, that is being currently used.
     //
-    DatanodeInfo[] getPipeline() {
-      synchronized (dataQueue) {
-        if (nodes == null) {
-          return null;
-        }
-        DatanodeInfo[] value = new DatanodeInfo[nodes.length];
-        for (int i = 0; i < nodes.length; i++) {
-          value[i] = nodes[i];
-        }
-        return value;
+    synchronized DatanodeInfo[] getPipeline() {
+      if (streamer == null) {
+        return null;
+      }
+      DatanodeInfo[] currentNodes = streamer.getNodes();
+      if (currentNodes == null) {
+        return null;
+      }
+      DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
+      for (int i = 0; i < currentNodes.length; i++) {
+        value[i] = currentNodes[i];
       }
+      return value;
     }
 
-    private Progressable progress;
-
     private DFSOutputStream(String src, long blockSize, Progressable progress,
         int bytesPerChecksum) throws IOException {
       super(new CRC32(), bytesPerChecksum, 4);
@@ -2712,58 +2952,13 @@
       // The last partial block of the file has to be filled.
       //
       if (lastBlock != null) {
-        block = lastBlock.getBlock();
-        long usedInLastBlock = stat.getLen() % blockSize;
-        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
-        // calculate the amount of free space in the pre-existing 
-        // last crc chunk
-        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
-        int freeInCksum = bytesPerChecksum - usedInCksum;
-
-        // if there is space in the last block, then we have to 
-        // append to that block
-        if (freeInLastBlock > blockSize) {
-          throw new IOException("The last block for file " + 
-                                src + " is full.");
-        }
-
         // indicate that we are appending to an existing block
         bytesCurBlock = lastBlock.getBlockSize();
-
-        if (usedInCksum > 0 && freeInCksum > 0) {
-          // if there is space in the last partial chunk, then 
-          // setup in such a way that the next packet will have only 
-          // one chunk that fills up the partial chunk.
-          //
-          computePacketChunkSize(0, freeInCksum);
-          resetChecksumChunk(freeInCksum);
-          this.appendChunk = true;
-        } else {
-          // if the remaining space in the block is smaller than 
-          // that expected size of of a packet, then create 
-          // smaller size packet.
-          //
-          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
-                                 bytesPerChecksum);
-        }
-
-        // setup pipeline to append to the last block XXX retries??
-        nodes = lastBlock.getLocations();
-        errorIndex = -1;   // no errors yet.
-        if (nodes.length < 1) {
-          throw new IOException("Unable to retrieve blocks locations " +
-                                " for last block " + block +
-                                "of file " + src);
-                        
-        }
-        processDatanodeError(true, true);
-        streamer.start();
-      }
-      else {
+        streamer.initAppend(lastBlock, stat, bytesPerChecksum);
+      } else {
         computePacketChunkSize(writePacketSize, bytesPerChecksum);
-        streamer.start();
       }
+      streamer.start();
     }
 
     private void computePacketChunkSize(int psize, int csize) {
@@ -2778,184 +2973,28 @@
                   ", packetSize=" + packetSize);
       }
     }
-
-    /**
-     * Open a DataOutputStream to a DataNode so that it can be written to.
-     * This happens when a file is created and each time a new block is allocated.
-     * Must get block ID and the IDs of the destinations from the namenode.
-     * Returns the list of target datanodes.
-     */
-    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
-      LocatedBlock lb = null;
-      boolean retry = false;
-      DatanodeInfo[] nodes;
-      int count = conf.getInt("dfs.client.block.write.retries", 3);
-      boolean success;
-      do {
-        hasError = false;
-        lastException = null;
-        errorIndex = 0;
-        retry = false;
-        nodes = null;
-        success = false;
-                
-        long startTime = System.currentTimeMillis();
-        lb = locateFollowingBlock(startTime);
-        block = lb.getBlock();
-        accessToken = lb.getAccessToken();
-        nodes = lb.getLocations();
   
-        //
-        // Connect to first DataNode in the list.
-        //
-        success = createBlockOutputStream(nodes, clientName, false);
-
-        if (!success) {
-          LOG.info("Abandoning block " + block);
-          namenode.abandonBlock(block, src, clientName);
-
-          // Connection failed.  Let's wait a little bit and retry
-          retry = true;
-          try {
-            if (System.currentTimeMillis() - startTime > 5000) {
-              LOG.info("Waiting to find target node: " + nodes[0].getName());
-            }
-            Thread.sleep(6000);
-          } catch (InterruptedException iex) {
-          }
-        }
-      } while (retry && --count >= 0);
-
-      if (!success) {
-        throw new IOException("Unable to create new block.");
+    private void queuePacket(Packet packet) {
+      synchronized (dataQueue) {
+        dataQueue.addLast(packet);
+        dataQueue.notifyAll();
       }
-      return nodes;
     }
 
-    // connects to the first datanode in the pipeline
-    // Returns true if success, otherwise return failure.
-    //
-    private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
-                    boolean recoveryFlag) {
-      String firstBadLink = "";
-      if (LOG.isDebugEnabled()) {
-        for (int i = 0; i < nodes.length; i++) {
-          LOG.debug("pipeline = " + nodes[i].getName());
-        }
-      }
-
-      // persist blocks on namenode on next flush
-      persistBlocks = true;
-
-      try {
-        LOG.debug("Connecting to " + nodes[0].getName());
-        InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
-        s = socketFactory.createSocket();
-        int timeoutValue = (socketTimeout > 0) ? 
-                           (3000 * nodes.length + socketTimeout) : 0;
-        NetUtils.connect(s, target, timeoutValue);
-        s.setSoTimeout(timeoutValue);
-        s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
-        LOG.debug("Send buf size " + s.getSendBufferSize());
-        long writeTimeout = (datanodeWriteTimeout > 0) ? 
-             (HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
-              datanodeWriteTimeout) : 0;
-
-        //
-        // Xmit header info to datanode
-        //
-        DataOutputStream out = new DataOutputStream(
-            new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), 
-                                     DataNode.SMALL_BUFFER_SIZE));
-        blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
-
-        out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
-        out.write( DataTransferProtocol.OP_WRITE_BLOCK );
-        out.writeLong( block.getBlockId() );
-        out.writeLong( block.getGenerationStamp() );
-        out.writeInt( nodes.length );
-        out.writeBoolean( recoveryFlag );       // recovery flag
-        Text.writeString( out, client );
-        out.writeBoolean(false); // Not sending src node information
-        out.writeInt( nodes.length - 1 );
-        for (int i = 1; i < nodes.length; i++) {
-          nodes[i].write(out);
-        }
-        accessToken.write(out);
-        checksum.writeHeader( out );
-        out.flush();
-
-        // receive ack for connect
-        firstBadLink = Text.readString(blockReplyStream);
-        if (firstBadLink.length() != 0) {
-          throw new IOException("Bad connect ack with firstBadLink " + firstBadLink);
-        }
-
-        blockStream = out;
-        return true;     // success
-
-      } catch (IOException ie) {
-
-        LOG.info("Exception in createBlockOutputStream " + ie);
-
-        // find the datanode that matches
-        if (firstBadLink.length() != 0) {
-          for (int i = 0; i < nodes.length; i++) {
-            if (nodes[i].getName().equals(firstBadLink)) {
-              errorIndex = i;
-              break;
-            }
-          }
-        }
-        hasError = true;
-        setLastException(ie);
-        blockReplyStream = null;
-        return false;  // error
-      }
-    }
-  
-    private LocatedBlock locateFollowingBlock(long start
-                                              ) throws IOException {     
-      int retries = 5;
-      long sleeptime = 400;
-      while (true) {
-        long localstart = System.currentTimeMillis();
-        while (true) {
+    private void waitAndQueuePacket(Packet packet) throws IOException {
+      synchronized (dataQueue) {
+        // If queue is full, then wait till we have enough space
+        while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) {
           try {
-            return namenode.addBlock(src, clientName);
-          } catch (RemoteException e) {
-            IOException ue = 
-              e.unwrapRemoteException(FileNotFoundException.class,
-                                      AccessControlException.class,
-                                      QuotaExceededException.class);
-            if (ue != e) { 
-              throw ue; // no need to retry these exceptions
-            }
-            
-            if (--retries == 0 && 
-                !NotReplicatedYetException.class.getName().
-                equals(e.getClassName())) {
-              throw e;
-            } else {
-              LOG.info(StringUtils.stringifyException(e));
-              if (System.currentTimeMillis() - localstart > 5000) {
-                LOG.info("Waiting for replication for " + 
-                         (System.currentTimeMillis() - localstart)/1000 + 
-                         " seconds");
-              }
-              try {
-                LOG.warn("NotReplicatedYetException sleeping " + src +
-                          " retries left " + retries);
-                Thread.sleep(sleeptime);
-                sleeptime *= 2;
-              } catch (InterruptedException ie) {
-              }
-            }                
+            dataQueue.wait();
+          } catch (InterruptedException  e) {
           }
         }
-      } 
+        isClosed();
+        queuePacket(packet);
+      }
     }
-  
+
     // @see FSOutputSummer#writeChunk()
     @Override
     protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
@@ -2975,74 +3014,59 @@
                               this.checksum.getChecksumSize() + 
                               " but found to be " + checksum.length);
       }
-
-      synchronized (dataQueue) {
   
-        // If queue is full, then wait till we can create  enough space
-        while (!closed && dataQueue.size() + ackQueue.size()  > maxPackets) {
-          try {
-            dataQueue.wait();
-          } catch (InterruptedException  e) {
-          }
-        }
-        isClosed();
-  
-        if (currentPacket == null) {
-          currentPacket = new Packet(packetSize, chunksPerPacket, 
-                                     bytesCurBlock);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
-                      currentPacket.seqno +
-                      ", src=" + src +
-                      ", packetSize=" + packetSize +
-                      ", chunksPerPacket=" + chunksPerPacket +
-                      ", bytesCurBlock=" + bytesCurBlock);
-          }
+      if (currentPacket == null) {
+        currentPacket = new Packet(packetSize, chunksPerPacket, 
+            bytesCurBlock);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
+              currentPacket.seqno +
+              ", src=" + src +
+              ", packetSize=" + packetSize +
+              ", chunksPerPacket=" + chunksPerPacket +
+              ", bytesCurBlock=" + bytesCurBlock);
         }
+      }
 
-        currentPacket.writeChecksum(checksum, 0, cklen);
-        currentPacket.writeData(b, offset, len);
-        currentPacket.numChunks++;
-        bytesCurBlock += len;
+      currentPacket.writeChecksum(checksum, 0, cklen);
+      currentPacket.writeData(b, offset, len);
+      currentPacket.numChunks++;
+      bytesCurBlock += len;
 
-        // If packet is full, enqueue it for transmission
+      // If packet is full, enqueue it for transmission
+      //
+      if (currentPacket.numChunks == currentPacket.maxChunks ||
+          bytesCurBlock == blockSize) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DFSClient writeChunk packet full seqno=" +
+              currentPacket.seqno +
+              ", src=" + src +
+              ", bytesCurBlock=" + bytesCurBlock +
+              ", blockSize=" + blockSize +
+              ", appendChunk=" + appendChunk);
+        }
         //
-        if (currentPacket.numChunks == currentPacket.maxChunks ||
-            bytesCurBlock == blockSize) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("DFSClient writeChunk packet full seqno=" +
-                      currentPacket.seqno +
-                      ", src=" + src +
-                      ", bytesCurBlock=" + bytesCurBlock +
-                      ", blockSize=" + blockSize +
-                      ", appendChunk=" + appendChunk);
-          }
-          //
-          // if we allocated a new packet because we encountered a block
-          // boundary, reset bytesCurBlock.
-          //
-          if (bytesCurBlock == blockSize) {
-            currentPacket.lastPacketInBlock = true;
-            bytesCurBlock = 0;
-            lastFlushOffset = -1;
-          }
-          dataQueue.addLast(currentPacket);
-          dataQueue.notifyAll();
-          currentPacket = null;
- 
-          // If this was the first write after reopening a file, then the above
-          // write filled up any partial chunk. Tell the summer to generate full 
-          // crc chunks from now on.
-          if (appendChunk) {
-            appendChunk = false;
-            resetChecksumChunk(bytesPerChecksum);
-          }
-          int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
-          computePacketChunkSize(psize, bytesPerChecksum);
+        // if we allocated a new packet because we encountered a block
+        // boundary, reset bytesCurBlock.
+        //
+        if (bytesCurBlock == blockSize) {
+          currentPacket.lastPacketInBlock = true;
+          bytesCurBlock = 0;
+          lastFlushOffset = -1;
+        }
+        waitAndQueuePacket(currentPacket);
+        currentPacket = null;
+
+        // If this was the first write after reopening a file, then the above
+        // write filled up any partial chunk. Tell the summer to generate full 
+        // crc chunks from now on.
+        if (appendChunk) {
+          appendChunk = false;
+          resetChecksumChunk(bytesPerChecksum);
         }
+        int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
+        computePacketChunkSize(psize, bytesPerChecksum);
       }
-      //LOG.debug("DFSClient writeChunk done length " + len +
-      //          " checksum length " + cklen);
     }
   
     /**
@@ -3051,6 +3075,8 @@
      * datanode. Block allocations are persisted on namenode.
      */
     public synchronized void sync() throws IOException {
+      checkOpen();
+      isClosed();
       try {
         /* Record current blockOffset. This might be changed inside
          * flushBuffer() where a partial checksum chunk might be flushed.
@@ -3087,13 +3113,11 @@
         // If any new blocks were allocated since the last flush, 
         // then persist block locations on namenode. 
         //
-        if (persistBlocks) {
+        if (persistBlocks.getAndSet(false)) {
           namenode.fsync(src, clientName);
-          persistBlocks = false;
         }
       } catch (IOException e) {
           lastException = new IOException("IOException flush:" + e);
-          closed = true;
           closeThreads();
           throw e;
       }
@@ -3106,85 +3130,39 @@
     private synchronized void flushInternal() throws IOException {
       checkOpen();
       isClosed();
+      //
+      // If there is data in the current buffer, send it across
+      //
+      if (currentPacket != null) {
+        queuePacket(currentPacket);
+        currentPacket = null;
+      }
 
-      while (!closed) {
-        synchronized (dataQueue) {
-          isClosed();
-          //
-          // If there is data in the current buffer, send it across
-          //
-          if (currentPacket != null) {
-            dataQueue.addLast(currentPacket);
-            dataQueue.notifyAll();
-            currentPacket = null;
-          }
-
-          // wait for all buffers to be flushed to datanodes
-          if (!closed && dataQueue.size() != 0) {
-            try {
-              dataQueue.wait();
-            } catch (InterruptedException e) {
-            }
-            continue;
-          }
-        }
-
-        // wait for all acks to be received back from datanodes
-        synchronized (ackQueue) {
-          if (!closed && ackQueue.size() != 0) {
-            try {
-              ackQueue.wait();
-            } catch (InterruptedException e) {
-            }
-            continue;
-          }
-        }
-
-        // acquire both the locks and verify that we are
-        // *really done*. In the case of error recovery, 
-        // packets might move back from ackQueue to dataQueue.
-        //
-        synchronized (dataQueue) {
-          synchronized (ackQueue) {
-            if (dataQueue.size() + ackQueue.size() == 0) {
-              break;       // we are done
-            }
+      synchronized (dataQueue) {
+        while (!closed && dataQueue.size() + ackQueue.size() > 0) {
+          try {
+            dataQueue.wait();
+          } catch (InterruptedException  e) {
           }
         }
+        isClosed();
       }
     }
-  
-    /**
-     * Closes this output stream and releases any system 
-     * resources associated with this stream.
-     */
-    @Override
-    public void close() throws IOException {
-      if(closed)
-        return;
-      closeInternal();
-      leasechecker.remove(src);
-      
-      if (s != null) {
-        s.close();
-        s = null;
-      }
-    }
- 
+
     // shutdown datastreamer and responseprocessor threads.
     private void closeThreads() throws IOException {
       try {
         streamer.close();
         streamer.join();
-        
-        // shutdown response after streamer has exited.
-        if (response != null) {
-          response.close();
-          response.join();
-          response = null;
+        if (s != null) {
+          s.close();
         }
       } catch (InterruptedException e) {
-        throw new IOException("Failed to shutdown response thread");
+        throw new IOException("Failed to shutdown streamer");
+      } finally {
+        streamer = null;
+        s = null;
+        closed = true;
       }
     }
     
@@ -3192,65 +3170,56 @@
      * Closes this output stream and releases any system 
      * resources associated with this stream.
      */
-    private synchronized void closeInternal() throws IOException {
-      checkOpen();
-      isClosed();
+    @Override
+    public synchronized void close() throws IOException {
+      if (closed) {
+        IOException e = lastException;
+        if (e == null)
+          return;
+        else
+          throw e;
+      }
 
       try {
-          flushBuffer();       // flush from all upper layers
-      
-          // Mark that this packet is the last packet in block.
-          // If there are no outstanding packets and the last packet
-          // was not the last one in the current block, then create a
-          // packet with empty payload.
-          synchronized (dataQueue) {
-            if (currentPacket == null && bytesCurBlock != 0) {
-              currentPacket = new Packet(packetSize, chunksPerPacket,
-                                         bytesCurBlock);
-            }
-            if (currentPacket != null) { 
-              currentPacket.lastPacketInBlock = true;
-            }
-          }
-
-        flushInternal();             // flush all data to Datanodes
-        isClosed(); // check to see if flushInternal had any exceptions
-        closed = true; // allow closeThreads() to showdown threads
+        flushBuffer();       // flush from all upper layers
 
-        closeThreads();
-        
-        synchronized (dataQueue) {
-          if (blockStream != null) {
-            blockStream.writeInt(0); // indicate end-of-block to datanode
-            blockStream.close();
-            blockReplyStream.close();
-          }
-          if (s != null) {
-            s.close();
-            s = null;
-          }
+        // Mark that this packet is the last packet in block.
+        // If there are no outstanding packets and the last packet
+        // was not the last one in the current block, then create a
+        // packet with empty payload.
+        if (currentPacket == null && bytesCurBlock != 0) {
+          currentPacket = new Packet(packetSize, chunksPerPacket,
+              bytesCurBlock);
+        }
+        if (currentPacket != null) { 
+          currentPacket.lastPacketInBlock = true;
         }
 
-        streamer = null;
-        blockStream = null;
-        blockReplyStream = null;
+        flushInternal();             // flush all data to Datanodes
+        closeThreads();
+        completeFile();
+        leasechecker.remove(src);
+      } finally {
+        closed = true;
+      }
+    }
 
-        long localstart = System.currentTimeMillis();
-        boolean fileComplete = false;
-        while (!fileComplete) {
-          fileComplete = namenode.complete(src, clientName);
-          if (!fileComplete) {
-            try {
-              Thread.sleep(400);
-              if (System.currentTimeMillis() - localstart > 5000) {
-                LOG.info("Could not complete file " + src + " retrying...");
-              }
-            } catch (InterruptedException ie) {
+    // should be called holding (this) lock since setTestFilename() may 
+    // be called during unit tests
+    private void completeFile() throws IOException {
+      long localstart = System.currentTimeMillis();
+      boolean fileComplete = false;
+      while (!fileComplete) {
+        fileComplete = namenode.complete(src, clientName);
+        if (!fileComplete) {
+          try {
+            Thread.sleep(400);
+            if (System.currentTimeMillis() - localstart > 5000) {
+              LOG.info("Could not complete file " + src + " retrying...");
             }
+          } catch (InterruptedException ie) {
           }
         }
-      } finally {
-        closed = true;
       }
     }
 



Mime
View raw message