hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r783055 [4/6] - in /hadoop/core/branches/HADOOP-3628-2: ./ .eclipse.templates/ ivy/ lib/ lib/jsp-2.1/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-schedu...
Date Tue, 09 Jun 2009 16:11:23 GMT
Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Tue Jun  9 16:11:19 2009
@@ -45,6 +45,7 @@
 import java.net.*;
 import java.util.*;
 import java.util.zip.CRC32;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentHashMap;
 import java.nio.BufferOverflowException;
@@ -219,10 +220,9 @@
   
       // close connections to the namenode
       RPC.stopProxy(rpcNamenode);
-      if (LOG.isDebugEnabled()) {
-        closedLocation = new IOException("Filesystem closed");
-        LOG.debug("Closing filesystem", closedLocation);
-      }
+      //note where the location was closed
+      closedLocation = new IOException("Filesystem closed");
+      LOG.debug("Closing filesystem", closedLocation);
     }
   }
 
@@ -2096,46 +2096,31 @@
    * starts sending packets from the dataQueue.
   ****************************************************************/
   class DFSOutputStream extends FSOutputSummer implements Syncable {
+    private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
     private Socket s;
     // closed is accessed by different threads under different locks.
-    volatile boolean closed = false;
+    private volatile boolean closed = false;
   
     private String src;
-    private DataOutputStream blockStream;
-    private DataInputStream blockReplyStream;
-    private Block block;
-    private AccessToken accessToken;
-    final private long blockSize;
-    private DataChecksum checksum;
-    private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
-    private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
+    private final long blockSize;
+    private final DataChecksum checksum;
+    // both dataQueue and ackQueue are protected by dataQueue lock
+    private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
+    private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
     private Packet currentPacket = null;
-    private int maxPackets = 80; // each packet 64K, total 5MB
-    // private int maxPackets = 1000; // each packet 64K, total 64MB
-    private DataStreamer streamer = new DataStreamer();;
-    private ResponseProcessor response = null;
+    private DataStreamer streamer = new DataStreamer();
     private long currentSeqno = 0;
     private long bytesCurBlock = 0; // bytes writen in current block
     private int packetSize = 0; // write packet size, including the header.
     private int chunksPerPacket = 0;
-    private DatanodeInfo[] nodes = null; // list of targets for current block
-    private volatile boolean hasError = false;
-    private volatile int errorIndex = 0;
     private volatile IOException lastException = null;
     private long artificialSlowdown = 0;
     private long lastFlushOffset = -1; // offset when flush was invoked
-    //volatile: written holding dataQueue and read holding DFSOutputStream 
-    private volatile boolean persistBlocks = false;//persist blocks on namenode
-    private int recoveryErrorCount = 0; // number of times block recovery failed
-    private int maxRecoveryErrorCount = 5; // try block recovery 5 times
+    //persist blocks on namenode
+    private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
     private volatile boolean appendChunk = false;   // appending to existing partial block
     private long initialFileSize = 0; // at time of file open
-
-    private void setLastException(IOException e) {
-      if (lastException == null) {
-        lastException = e;
-      }
-    }
+    private Progressable progress;
     
     private class Packet {
       ByteBuffer buffer;           // only one of buf and buffer is non-null
@@ -2241,11 +2226,24 @@
     // if them are received, the DataStreamer closes the current block.
     //
     private class DataStreamer extends Daemon {
-
-      private volatile boolean closed = false;
-  
+      private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
+      private int recoveryErrorCount = 0; // number of times block recovery failed
+      private volatile boolean streamerClosed = false;
+      private Block block;
+      private AccessToken accessToken;
+      private DataOutputStream blockStream;
+      private DataInputStream blockReplyStream;
+      private ResponseProcessor response = null;
+      private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+      private volatile boolean hasError = false;
+      private volatile int errorIndex = 0;
+  
+      /*
+       * streamer thread is the only thread that opens streams to datanode, 
+       * and closes them. Any error recovery is also done by this thread.
+       */
       public void run() {
-        while (!closed && clientRunning) {
+        while (!streamerClosed && clientRunning) {
 
           // if the Responder encountered an error, shutdown Responder
           if (hasError && response != null) {
@@ -2258,120 +2256,104 @@
           }
 
           Packet one = null;
-          synchronized (dataQueue) {
 
-            // process IO errors if any
-            boolean doSleep = processDatanodeError(hasError, false);
+          // process IO errors if any
+          boolean doSleep = processDatanodeError(hasError, false);
 
+          synchronized (dataQueue) {
             // wait for a packet to be sent.
-            while ((!closed && !hasError && clientRunning 
-                   && dataQueue.size() == 0) || doSleep) {
+            while ((!streamerClosed && !hasError && clientRunning 
+                && dataQueue.size() == 0) || doSleep) {
               try {
                 dataQueue.wait(1000);
               } catch (InterruptedException  e) {
               }
               doSleep = false;
             }
-            if (closed || hasError || dataQueue.size() == 0 || !clientRunning) {
+            if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
               continue;
             }
+            // get packet to be sent.
+            one = dataQueue.getFirst();
+          }
 
-            try {
-              // get packet to be sent.
-              one = dataQueue.getFirst();
-              long offsetInBlock = one.offsetInBlock;
-  
-              // get new block from namenode.
-              if (blockStream == null) {
-                LOG.debug("Allocating new block");
-                nodes = nextBlockOutputStream(src); 
-                this.setName("DataStreamer for file " + src +
-                             " block " + block);
-                response = new ResponseProcessor(nodes);
-                response.start();
-              }
+          try {
+            long offsetInBlock = one.offsetInBlock;
 
-              if (offsetInBlock >= blockSize) {
-                throw new IOException("BlockSize " + blockSize +
-                                      " is smaller than data size. " +
-                                      " Offset of packet in block " + 
-                                      offsetInBlock +
-                                      " Aborting file " + src);
-              }
+            // get new block from namenode.
+            if (blockStream == null) {
+              LOG.debug("Allocating new block");
+              nodes = nextBlockOutputStream(src); 
+              this.setName("DataStreamer for file " + src +
+                  " block " + block);
+              response = new ResponseProcessor(nodes);
+              response.start();
+            }
+
+            if (offsetInBlock >= blockSize) {
+              throw new IOException("BlockSize " + blockSize +
+                  " is smaller than data size. " +
+                  " Offset of packet in block " + 
+                  offsetInBlock +
+                  " Aborting file " + src);
+            }
 
-              ByteBuffer buf = one.getBuffer();
-              
+            ByteBuffer buf = one.getBuffer();
+
+            synchronized (dataQueue) {
               // move packet from dataQueue to ackQueue
               dataQueue.removeFirst();
+              ackQueue.addLast(one);
               dataQueue.notifyAll();
-              synchronized (ackQueue) {
-                ackQueue.addLast(one);
-                ackQueue.notifyAll();
-              } 
-              
-              // write out data to remote datanode
-              blockStream.write(buf.array(), buf.position(), buf.remaining());
-              
-              if (one.lastPacketInBlock) {
-                blockStream.writeInt(0); // indicate end-of-block 
-              }
-              blockStream.flush();
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("DataStreamer block " + block +
-                          " wrote packet seqno:" + one.seqno +
-                          " size:" + buf.remaining() +
-                          " offsetInBlock:" + one.offsetInBlock + 
-                          " lastPacketInBlock:" + one.lastPacketInBlock);
-              }
-            } catch (Throwable e) {
-              LOG.warn("DataStreamer Exception: " + 
-                       StringUtils.stringifyException(e));
-              if (e instanceof IOException) {
-                setLastException((IOException)e);
-              }
-              hasError = true;
             }
+
+            // write out data to remote datanode
+            blockStream.write(buf.array(), buf.position(), buf.remaining());
+
+            if (one.lastPacketInBlock) {
+              blockStream.writeInt(0); // indicate end-of-block 
+            }
+            blockStream.flush();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("DataStreamer block " + block +
+                  " wrote packet seqno:" + one.seqno +
+                  " size:" + buf.remaining() +
+                  " offsetInBlock:" + one.offsetInBlock + 
+                  " lastPacketInBlock:" + one.lastPacketInBlock);
+            }
+          } catch (Throwable e) {
+            LOG.warn("DataStreamer Exception: " + 
+                StringUtils.stringifyException(e));
+            if (e instanceof IOException) {
+              setLastException((IOException)e);
+            }
+            hasError = true;
           }
 
-          if (closed || hasError || !clientRunning) {
+
+          if (streamerClosed || hasError || !clientRunning) {
             continue;
           }
 
           // Is this block full?
           if (one.lastPacketInBlock) {
-            synchronized (ackQueue) {
-              while (!hasError && ackQueue.size() != 0 && clientRunning) {
+            synchronized (dataQueue) {
+              while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
                 try {
-                  ackQueue.wait();   // wait for acks to arrive from datanodes
+                  dataQueue.wait(1000);   // wait for acks to arrive from datanodes
                 } catch (InterruptedException  e) {
                 }
               }
             }
-            LOG.debug("Closing old block " + block);
-            this.setName("DataStreamer for file " + src);
-
-            response.close();        // ignore all errors in Response
-            try {
-              response.join();
-              response = null;
-            } catch (InterruptedException  e) {
-            }
-
-            if (closed || hasError || !clientRunning) {
+            if (streamerClosed || hasError || !clientRunning) {
               continue;
             }
 
-            synchronized (dataQueue) {
-              try {
-                blockStream.close();
-                blockReplyStream.close();
-              } catch (IOException e) {
-              }
-              nodes = null;
-              response = null;
-              blockStream = null;
-              blockReplyStream = null;
-            }
+            LOG.debug("Closing old block " + block);
+            this.setName("DataStreamer for file " + src);
+            closeResponder();
+            closeStream();
+            nodes = null;
           }
           if (progress != null) { progress.progress(); }
 
@@ -2382,288 +2364,545 @@
             } catch (InterruptedException e) {}
           }
         }
+        closeInternal();
       }
 
-      // shutdown thread
-      void close() {
+      private void closeInternal() {
+        closeResponder();
+        closeStream();
+        streamerClosed = true;
         closed = true;
         synchronized (dataQueue) {
           dataQueue.notifyAll();
         }
-        synchronized (ackQueue) {
-          ackQueue.notifyAll();
-        }
+      }
+
+      /*
+       * close both streamer and DFSOutputStream, should be called only 
+       * by an external thread and only after all data to be sent has 
+       * been flushed to datanode.
+       */
+      void close() {
+        streamerClosed = true;
         this.interrupt();
       }
-    }
-                  
-    //
-    // Processes reponses from the datanodes.  A packet is removed 
-    // from the ackQueue when its response arrives.
-    //
-    private class ResponseProcessor extends Thread {
 
-      private volatile boolean closed = false;
-      private DatanodeInfo[] targets = null;
-      private boolean lastPacketInBlock = false;
+      private void closeResponder() {
+        if (response != null) {
+          try {
+            response.close();
+            response.join();
+          } catch (InterruptedException  e) {
+          } finally {
+            response = null;
+          }
+        }
+      }
 
-      ResponseProcessor (DatanodeInfo[] targets) {
-        this.targets = targets;
+      private void closeStream() {
+        if (blockStream != null) {
+          try {
+            blockStream.close();
+          } catch (IOException e) {
+          } finally {
+            blockStream = null;
+          }
+        }
+        if (blockReplyStream != null) {
+          try {
+            blockReplyStream.close();
+          } catch (IOException e) {
+          } finally {
+            blockReplyStream = null;
+          }
+        }
       }
 
-      public void run() {
+      //
+      // Processes reponses from the datanodes.  A packet is removed 
+      // from the ackQueue when its response arrives.
+      //
+      private class ResponseProcessor extends Daemon {
 
-        this.setName("ResponseProcessor for block " + block);
-  
-        while (!closed && clientRunning && !lastPacketInBlock) {
-          // process responses from datanodes.
-          try {
-            // verify seqno from datanode
-            long seqno = blockReplyStream.readLong();
-            LOG.debug("DFSClient received ack for seqno " + seqno);
-            if (seqno == -1) {
-              continue;
-            } else if (seqno == -2) {
-              // no nothing
-            } else {
-              Packet one = null;
-              synchronized (ackQueue) {
-                one = ackQueue.getFirst();
+        private volatile boolean responderClosed = false;
+        private DatanodeInfo[] targets = null;
+        private boolean isLastPacketInBlock = false;
+
+        ResponseProcessor (DatanodeInfo[] targets) {
+          this.targets = targets;
+        }
+
+        public void run() {
+
+          this.setName("ResponseProcessor for block " + block);
+
+          while (!responderClosed && clientRunning && !isLastPacketInBlock) {
+            // process responses from datanodes.
+            try {
+              // verify seqno from datanode
+              long seqno = blockReplyStream.readLong();
+              LOG.debug("DFSClient received ack for seqno " + seqno);
+              if (seqno == -1) {
+                continue;
+              } else if (seqno == -2) {
+                // no nothing
+              } else {
+                Packet one = null;
+                synchronized (dataQueue) {
+                  one = ackQueue.getFirst();
+                }
+                if (one.seqno != seqno) {
+                  throw new IOException("Responseprocessor: Expecting seqno " + 
+                      " for block " + block +
+                      one.seqno + " but received " + seqno);
+                }
+                isLastPacketInBlock = one.lastPacketInBlock;
               }
-              if (one.seqno != seqno) {
-                throw new IOException("Responseprocessor: Expecting seqno " + 
-                                      " for block " + block +
-                                      one.seqno + " but received " + seqno);
+
+              // processes response status from all datanodes.
+              for (int i = 0; i < targets.length && clientRunning; i++) {
+                short reply = blockReplyStream.readShort();
+                if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
+                  errorIndex = i; // first bad datanode
+                  throw new IOException("Bad response " + reply +
+                      " for block " + block +
+                      " from datanode " + 
+                      targets[i].getName());
+                }
               }
-              lastPacketInBlock = one.lastPacketInBlock;
-            }
 
-            // processes response status from all datanodes.
-            for (int i = 0; i < targets.length && clientRunning; i++) {
-              short reply = blockReplyStream.readShort();
-              if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
-                errorIndex = i; // first bad datanode
-                throw new IOException("Bad response " + reply +
-                                      " for block " + block +
-                                      " from datanode " + 
-                                      targets[i].getName());
+              synchronized (dataQueue) {
+                ackQueue.removeFirst();
+                dataQueue.notifyAll();
+              }
+            } catch (Exception e) {
+              if (!responderClosed) {
+                if (e instanceof IOException) {
+                  setLastException((IOException)e);
+                }
+                hasError = true;
+                synchronized (dataQueue) {
+                  dataQueue.notifyAll();
+                }
+                LOG.warn("DFSOutputStream ResponseProcessor exception " + 
+                    " for block " + block +
+                    StringUtils.stringifyException(e));
+                responderClosed = true;
               }
             }
+          }
+        }
 
-            synchronized (ackQueue) {
-              ackQueue.removeFirst();
-              ackQueue.notifyAll();
-            }
-          } catch (Exception e) {
-            if (!closed) {
-              hasError = true;
-              if (e instanceof IOException) {
-                setLastException((IOException)e);
-              }
-              LOG.warn("DFSOutputStream ResponseProcessor exception " + 
-                       " for block " + block +
-                        StringUtils.stringifyException(e));
-              closed = true;
+        void close() {
+          responderClosed = true;
+          this.interrupt();
+        }
+      }
+
+      // If this stream has encountered any errors so far, shutdown 
+      // threads and mark stream as closed. Returns true if we should
+      // sleep for a while after returning from this call.
+      //
+      private boolean processDatanodeError(boolean error, boolean isAppend) {
+        if (!error) {
+          return false;
+        }
+        if (response != null) {
+          LOG.info("Error Recovery for block " + block +
+          " waiting for responder to exit. ");
+          return true;
+        }
+        if (errorIndex >= 0) {
+          LOG.warn("Error Recovery for block " + block
+              + " bad datanode[" + errorIndex + "] "
+              + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
+        }
+
+        closeStream();
+
+        // move packets from ack queue to front of the data queue
+        synchronized (dataQueue) {
+          dataQueue.addAll(0, ackQueue);
+          ackQueue.clear();
+        }
+
+        boolean success = false;
+        while (!success && !streamerClosed && clientRunning) {
+          DatanodeInfo[] newnodes = null;
+          if (nodes == null) {
+            String msg = "Could not get block locations. " + "Source file \""
+                + src + "\" - Aborting...";
+            LOG.warn(msg);
+            setLastException(new IOException(msg));
+            streamerClosed = true;
+            return false;
+          }
+          StringBuilder pipelineMsg = new StringBuilder();
+          for (int j = 0; j < nodes.length; j++) {
+            pipelineMsg.append(nodes[j].getName());
+            if (j < nodes.length - 1) {
+              pipelineMsg.append(", ");
             }
           }
+          // remove bad datanode from list of datanodes.
+          // If errorIndex was not set (i.e. appends), then do not remove 
+          // any datanodes
+          // 
+          if (errorIndex < 0) {
+            newnodes = nodes;
+          } else {
+            if (nodes.length <= 1) {
+              lastException = new IOException("All datanodes " + pipelineMsg
+                  + " are bad. Aborting...");
+              streamerClosed = true;
+              return false;
+            }
+            LOG.warn("Error Recovery for block " + block +
+                " in pipeline " + pipelineMsg + 
+                ": bad datanode " + nodes[errorIndex].getName());
+            newnodes =  new DatanodeInfo[nodes.length-1];
+            System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
+            System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
+                newnodes.length-errorIndex);
+          }
 
-          synchronized (dataQueue) {
-            dataQueue.notifyAll();
+          // Tell the primary datanode to do error recovery 
+          // by stamping appropriate generation stamps.
+          //
+          LocatedBlock newBlock = null;
+          ClientDatanodeProtocol primary =  null;
+          DatanodeInfo primaryNode = null;
+          try {
+            // Pick the "least" datanode as the primary datanode to avoid deadlock.
+            primaryNode = Collections.min(Arrays.asList(newnodes));
+            primary = createClientDatanodeProtocolProxy(primaryNode, conf);
+            newBlock = primary.recoverBlock(block, isAppend, newnodes);
+          } catch (IOException e) {
+            recoveryErrorCount++;
+            if (recoveryErrorCount > MAX_RECOVERY_ERROR_COUNT) {
+              if (nodes.length > 1) {
+                // if the primary datanode failed, remove it from the list.
+                // The original bad datanode is left in the list because it is
+                // conservative to remove only one datanode in one iteration.
+                for (int j = 0; j < nodes.length; j++) {
+                  if (nodes[j].equals(primaryNode)) {
+                    errorIndex = j; // forget original bad node.
+                  }
+                }
+                // remove primary node from list
+                newnodes =  new DatanodeInfo[nodes.length-1];
+                System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
+                System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
+                    newnodes.length-errorIndex);
+                nodes = newnodes;
+                LOG.warn("Error Recovery for block " + block + " failed "
+                    + " because recovery from primary datanode " + primaryNode
+                    + " failed " + recoveryErrorCount + " times. "
+                    + " Pipeline was " + pipelineMsg
+                    + ". Marking primary datanode as bad.");
+                recoveryErrorCount = 0; 
+                errorIndex = -1;
+                return true;          // sleep when we return from here
+              }
+              String emsg = "Error Recovery for block " + block + " failed "
+                  + " because recovery from primary datanode " + primaryNode
+                  + " failed " + recoveryErrorCount + " times. "
+                  + " Pipeline was " + pipelineMsg + ". Aborting...";
+              LOG.warn(emsg);
+              lastException = new IOException(emsg);
+              streamerClosed = true;
+              return false;       // abort with IOexception
+            } 
+            LOG.warn("Error Recovery for block " + block + " failed "
+                + " because recovery from primary datanode " + primaryNode
+                + " failed " + recoveryErrorCount + " times. "
+                + " Pipeline was " + pipelineMsg + ". Will retry...");
+            return true;          // sleep when we return from here
+          } finally {
+            RPC.stopProxy(primary);
           }
-          synchronized (ackQueue) {
-            ackQueue.notifyAll();
+          recoveryErrorCount = 0; // block recovery successful
+
+          // If the block recovery generated a new generation stamp, use that
+          // from now on.  Also, setup new pipeline
+          //
+          if (newBlock != null) {
+            block = newBlock.getBlock();
+            accessToken = newBlock.getAccessToken();
+            nodes = newBlock.getLocations();
           }
+
+          this.hasError = false;
+          lastException = null;
+          errorIndex = 0;
+          success = createBlockOutputStream(nodes, clientName, true);
         }
-      }
 
-      void close() {
-        closed = true;
-        this.interrupt();
+        if (!streamerClosed && clientRunning) {
+          response = new ResponseProcessor(nodes);
+          response.start();
+        }
+        return false; // do not sleep, continue processing
       }
-    }
 
-    // If this stream has encountered any errors so far, shutdown 
-    // threads and mark stream as closed. Returns true if we should
-    // sleep for a while after returning from this call.
-    //
-    private boolean processDatanodeError(boolean hasError, boolean isAppend) {
-      if (!hasError) {
-        return false;
-      }
-      if (response != null) {
-        LOG.info("Error Recovery for block " + block +
-                 " waiting for responder to exit. ");
-        return true;
-      }
-      if (errorIndex >= 0) {
-        LOG.warn("Error Recovery for block " + block
-            + " bad datanode[" + errorIndex + "] "
-            + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
-      }
+      /**
+       * Open a DataOutputStream to a DataNode so that it can be written to.
+       * This happens when a file is created and each time a new block is allocated.
+       * Must get block ID and the IDs of the destinations from the namenode.
+       * Returns the list of target datanodes.
+       */
+      private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
+        LocatedBlock lb = null;
+        boolean retry = false;
+        DatanodeInfo[] nodes = null;
+        int count = conf.getInt("dfs.client.block.write.retries", 3);
+        boolean success = false;
+        do {
+          hasError = false;
+          lastException = null;
+          errorIndex = 0;
+          retry = false;
+          success = false;
+
+          long startTime = System.currentTimeMillis();
+          lb = locateFollowingBlock(startTime);
+          block = lb.getBlock();
+          accessToken = lb.getAccessToken();
+          nodes = lb.getLocations();
 
-      if (blockStream != null) {
-        try {
-          blockStream.close();
-          blockReplyStream.close();
-        } catch (IOException e) {
+          //
+          // Connect to first DataNode in the list.
+          //
+          success = createBlockOutputStream(nodes, clientName, false);
+
+          if (!success) {
+            LOG.info("Abandoning block " + block);
+            namenode.abandonBlock(block, src, clientName);
+
+            // Connection failed.  Let's wait a little bit and retry
+            retry = true;
+            try {
+              if (System.currentTimeMillis() - startTime > 5000) {
+                LOG.info("Waiting to find target node: " + nodes[0].getName());
+              }
+              Thread.sleep(6000);
+            } catch (InterruptedException iex) {
+            }
+          }
+        } while (retry && --count >= 0);
+
+        if (!success) {
+          throw new IOException("Unable to create new block.");
         }
+        return nodes;
       }
-      blockStream = null;
-      blockReplyStream = null;
 
-      // move packets from ack queue to front of the data queue
-      synchronized (ackQueue) {
-        dataQueue.addAll(0, ackQueue);
-        ackQueue.clear();
-      }
-
-      boolean success = false;
-      while (!success && clientRunning) {
-        DatanodeInfo[] newnodes = null;
-        if (nodes == null) {
-          String msg = "Could not get block locations. " +
-                                          "Source file \"" + src
-                                          + "\" - Aborting...";
-          LOG.warn(msg);
-          setLastException(new IOException(msg));
-          closed = true;
-          if (streamer != null) streamer.close();
-          return false;
-        }
-        StringBuilder pipelineMsg = new StringBuilder();
-        for (int j = 0; j < nodes.length; j++) {
-          pipelineMsg.append(nodes[j].getName());
-          if (j < nodes.length - 1) {
-            pipelineMsg.append(", ");
+      // connects to the first datanode in the pipeline
+      // Returns true if success, otherwise return failure.
+      //
+      private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
+          boolean recoveryFlag) {
+        String firstBadLink = "";
+        if (LOG.isDebugEnabled()) {
+          for (int i = 0; i < nodes.length; i++) {
+            LOG.debug("pipeline = " + nodes[i].getName());
           }
         }
-        // remove bad datanode from list of datanodes.
-        // If errorIndex was not set (i.e. appends), then do not remove 
-        // any datanodes
-        // 
-        if (errorIndex < 0) {
-          newnodes = nodes;
-        } else {
-          if (nodes.length <= 1) {
-            lastException = new IOException("All datanodes " + pipelineMsg + 
-                                            " are bad. Aborting...");
-            closed = true;
-            if (streamer != null) streamer.close();
-            return false;
+
+        // persist blocks on namenode on next flush
+        persistBlocks.set(true);
+
+        try {
+          LOG.debug("Connecting to " + nodes[0].getName());
+          InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
+          s = socketFactory.createSocket();
+          int timeoutValue = (socketTimeout > 0) ? (3000 * nodes.length + socketTimeout) : 0;
+          NetUtils.connect(s, target, timeoutValue);
+          s.setSoTimeout(timeoutValue);
+          s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
+          LOG.debug("Send buf size " + s.getSendBufferSize());
+          long writeTimeout = (datanodeWriteTimeout > 0) ? 
+              (HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
+                  datanodeWriteTimeout) : 0;
+
+          //
+          // Xmit header info to datanode
+          //
+          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+              NetUtils.getOutputStream(s, writeTimeout),
+              DataNode.SMALL_BUFFER_SIZE));
+          blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
+
+          out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+          out.write(DataTransferProtocol.OP_WRITE_BLOCK);
+          out.writeLong(block.getBlockId());
+          out.writeLong(block.getGenerationStamp());
+          out.writeInt(nodes.length);
+          out.writeBoolean(recoveryFlag); // recovery flag
+          Text.writeString(out, client);
+          out.writeBoolean(false); // Not sending src node information
+          out.writeInt(nodes.length - 1);
+          for (int i = 1; i < nodes.length; i++) {
+            nodes[i].write(out);
           }
-          LOG.warn("Error Recovery for block " + block +
-                   " in pipeline " + pipelineMsg + 
-                   ": bad datanode " + nodes[errorIndex].getName());
-          newnodes =  new DatanodeInfo[nodes.length-1];
-          System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
-          System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
-              newnodes.length-errorIndex);
+          accessToken.write(out);
+          checksum.writeHeader(out);
+          out.flush();
+
+          // receive ack for connect
+          firstBadLink = Text.readString(blockReplyStream);
+          if (firstBadLink.length() != 0) {
+            throw new IOException("Bad connect ack with firstBadLink "
+                + firstBadLink);
+          }
+
+          blockStream = out;
+          return true; // success
+
+        } catch (IOException ie) {
+
+          LOG.info("Exception in createBlockOutputStream " + ie);
+
+          // find the datanode that matches
+          if (firstBadLink.length() != 0) {
+            for (int i = 0; i < nodes.length; i++) {
+              if (nodes[i].getName().equals(firstBadLink)) {
+                errorIndex = i;
+                break;
+              }
+            }
+          }
+          hasError = true;
+          setLastException(ie);
+          blockReplyStream = null;
+          return false;  // error
         }
+      }
 
-        // Tell the primary datanode to do error recovery 
-        // by stamping appropriate generation stamps.
-        //
-        LocatedBlock newBlock = null;
-        ClientDatanodeProtocol primary =  null;
-        DatanodeInfo primaryNode = null;
-        try {
-          // Pick the "least" datanode as the primary datanode to avoid deadlock.
-          primaryNode = Collections.min(Arrays.asList(newnodes));
-          primary = createClientDatanodeProtocolProxy(primaryNode, conf);
-          newBlock = primary.recoverBlock(block, isAppend, newnodes);
-        } catch (IOException e) {
-          recoveryErrorCount++;
-          if (recoveryErrorCount > maxRecoveryErrorCount) {
-            if (nodes.length > 1) {
-              // if the primary datanode failed, remove it from the list.
-              // The original bad datanode is left in the list because it is
-              // conservative to remove only one datanode in one iteration.
-              for (int j = 0; j < nodes.length; j++) {
-                if (nodes[j].equals(primaryNode)) {
-                  errorIndex = j; // forget original bad node.
-                }
+      private LocatedBlock locateFollowingBlock(long start) throws IOException {
+        int retries = 5;
+        long sleeptime = 400;
+        while (true) {
+          long localstart = System.currentTimeMillis();
+          while (true) {
+            try {
+              return namenode.addBlock(src, clientName);
+            } catch (RemoteException e) {
+              IOException ue = 
+                e.unwrapRemoteException(FileNotFoundException.class,
+                    AccessControlException.class,
+                    QuotaExceededException.class);
+              if (ue != e) { 
+                throw ue; // no need to retry these exceptions
               }
-              // remove primary node from list
-              newnodes =  new DatanodeInfo[nodes.length-1];
-              System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
-              System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
-                               newnodes.length-errorIndex);
-              nodes = newnodes;
-              LOG.warn("Error Recovery for block " + block + " failed " +
-                       " because recovery from primary datanode " +
-                       primaryNode + " failed " + recoveryErrorCount +
-                       " times. " + " Pipeline was " + pipelineMsg +
-                       ". Marking primary datanode as bad.");
-              recoveryErrorCount = 0; 
-              errorIndex = -1;
-              return true;          // sleep when we return from here
-            }
-            String emsg = "Error Recovery for block " + block + " failed " +
-                          " because recovery from primary datanode " +
-                          primaryNode + " failed " + recoveryErrorCount + 
-                          " times. "  + " Pipeline was " + pipelineMsg +
-                          ". Aborting...";
-            LOG.warn(emsg);
-            lastException = new IOException(emsg);
-            closed = true;
-            if (streamer != null) streamer.close();
-            return false;       // abort with IOexception
-          } 
-          LOG.warn("Error Recovery for block " + block + " failed " +
-                   " because recovery from primary datanode " +
-                   primaryNode + " failed " + recoveryErrorCount +
-                   " times. "  + " Pipeline was " + pipelineMsg +
-                   ". Will retry...");
-          return true;          // sleep when we return from here
-        } finally {
-          RPC.stopProxy(primary);
+
+              if (--retries == 0 && 
+                  !NotReplicatedYetException.class.getName().
+                  equals(e.getClassName())) {
+                throw e;
+              } else {
+                LOG.info(StringUtils.stringifyException(e));
+                if (System.currentTimeMillis() - localstart > 5000) {
+                  LOG.info("Waiting for replication for "
+                      + (System.currentTimeMillis() - localstart) / 1000
+                      + " seconds");
+                }
+                try {
+                  LOG.warn("NotReplicatedYetException sleeping " + src
+                      + " retries left " + retries);
+                  Thread.sleep(sleeptime);
+                  sleeptime *= 2;
+                } catch (InterruptedException ie) {
+                }
+              }                
+            }
+          }
+        } 
+      }
+
+      void initAppend(LocatedBlock lastBlock, FileStatus stat,
+          int bytesPerChecksum) throws IOException {
+        block = lastBlock.getBlock();
+        long usedInLastBlock = stat.getLen() % blockSize;
+        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+        // calculate the amount of free space in the pre-existing 
+        // last crc chunk
+        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+        int freeInCksum = bytesPerChecksum - usedInCksum;
+
+        // if there is space in the last block, then we have to 
+        // append to that block
+        if (freeInLastBlock > blockSize) {
+          throw new IOException("The last block for file " + 
+              src + " is full.");
         }
-        recoveryErrorCount = 0; // block recovery successful
 
-        // If the block recovery generated a new generation stamp, use that
-        // from now on.  Also, setup new pipeline
-        //
-        if (newBlock != null) {
-          block = newBlock.getBlock();
-          accessToken = newBlock.getAccessToken();
-          nodes = newBlock.getLocations();
+        if (usedInCksum > 0 && freeInCksum > 0) {
+          // if there is space in the last partial chunk, then 
+          // setup in such a way that the next packet will have only 
+          // one chunk that fills up the partial chunk.
+          //
+          computePacketChunkSize(0, freeInCksum);
+          resetChecksumChunk(freeInCksum);
+          appendChunk = true;
+        } else {
+          // if the remaining space in the block is smaller than 
+          // that expected size of of a packet, then create 
+          // smaller size packet.
+          //
+          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
+              bytesPerChecksum);
+        }
+
+        // setup pipeline to append to the last block XXX retries??
+        nodes = lastBlock.getLocations();
+        errorIndex = -1;   // no errors yet.
+        if (nodes.length < 1) {
+          throw new IOException("Unable to retrieve blocks locations " +
+              " for last block " + block +
+              "of file " + src);
+
         }
+        processDatanodeError(true, true);
+      }
 
-        this.hasError = false;
-        lastException = null;
-        errorIndex = 0;
-        success = createBlockOutputStream(nodes, clientName, true);
+      DatanodeInfo[] getNodes() {
+        return nodes;
       }
 
-      response = new ResponseProcessor(nodes);
-      response.start();
-      return false; // do not sleep, continue processing
+      private void setLastException(IOException e) {
+        if (lastException == null) {
+          lastException = e;
+        }
+      }
     }
 
     private void isClosed() throws IOException {
-      if (closed && lastException != null) {
-          throw lastException;
+      if (closed) {
+        IOException e = lastException;
+        throw e != null ? e : new IOException("DFSOutputStream is closed");
       }
     }
 
     //
     // returns the list of targets, if any, that is being currently used.
     //
-    DatanodeInfo[] getPipeline() {
-      synchronized (dataQueue) {
-        if (nodes == null) {
-          return null;
-        }
-        DatanodeInfo[] value = new DatanodeInfo[nodes.length];
-        for (int i = 0; i < nodes.length; i++) {
-          value[i] = nodes[i];
-        }
-        return value;
+    synchronized DatanodeInfo[] getPipeline() {
+      if (streamer == null) {
+        return null;
+      }
+      DatanodeInfo[] currentNodes = streamer.getNodes();
+      if (currentNodes == null) {
+        return null;
+      }
+      DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
+      for (int i = 0; i < currentNodes.length; i++) {
+        value[i] = currentNodes[i];
       }
+      return value;
     }
 
-    private Progressable progress;
-
     private DFSOutputStream(String src, long blockSize, Progressable progress,
         int bytesPerChecksum) throws IOException {
       super(new CRC32(), bytesPerChecksum, 4);
@@ -2720,58 +2959,13 @@
       // The last partial block of the file has to be filled.
       //
       if (lastBlock != null) {
-        block = lastBlock.getBlock();
-        long usedInLastBlock = stat.getLen() % blockSize;
-        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
-        // calculate the amount of free space in the pre-existing 
-        // last crc chunk
-        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
-        int freeInCksum = bytesPerChecksum - usedInCksum;
-
-        // if there is space in the last block, then we have to 
-        // append to that block
-        if (freeInLastBlock > blockSize) {
-          throw new IOException("The last block for file " + 
-                                src + " is full.");
-        }
-
         // indicate that we are appending to an existing block
         bytesCurBlock = lastBlock.getBlockSize();
-
-        if (usedInCksum > 0 && freeInCksum > 0) {
-          // if there is space in the last partial chunk, then 
-          // setup in such a way that the next packet will have only 
-          // one chunk that fills up the partial chunk.
-          //
-          computePacketChunkSize(0, freeInCksum);
-          resetChecksumChunk(freeInCksum);
-          this.appendChunk = true;
-        } else {
-          // if the remaining space in the block is smaller than 
-          // that expected size of of a packet, then create 
-          // smaller size packet.
-          //
-          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
-                                 bytesPerChecksum);
-        }
-
-        // setup pipeline to append to the last block XXX retries??
-        nodes = lastBlock.getLocations();
-        errorIndex = -1;   // no errors yet.
-        if (nodes.length < 1) {
-          throw new IOException("Unable to retrieve blocks locations " +
-                                " for last block " + block +
-                                "of file " + src);
-                        
-        }
-        processDatanodeError(true, true);
-        streamer.start();
-      }
-      else {
+        streamer.initAppend(lastBlock, stat, bytesPerChecksum);
+      } else {
         computePacketChunkSize(writePacketSize, bytesPerChecksum);
-        streamer.start();
       }
+      streamer.start();
     }
 
     private void computePacketChunkSize(int psize, int csize) {
@@ -2786,184 +2980,28 @@
                   ", packetSize=" + packetSize);
       }
     }
-
-    /**
-     * Open a DataOutputStream to a DataNode so that it can be written to.
-     * This happens when a file is created and each time a new block is allocated.
-     * Must get block ID and the IDs of the destinations from the namenode.
-     * Returns the list of target datanodes.
-     */
-    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
-      LocatedBlock lb = null;
-      boolean retry = false;
-      DatanodeInfo[] nodes;
-      int count = conf.getInt("dfs.client.block.write.retries", 3);
-      boolean success;
-      do {
-        hasError = false;
-        lastException = null;
-        errorIndex = 0;
-        retry = false;
-        nodes = null;
-        success = false;
-                
-        long startTime = System.currentTimeMillis();
-        lb = locateFollowingBlock(startTime);
-        block = lb.getBlock();
-        accessToken = lb.getAccessToken();
-        nodes = lb.getLocations();
   
-        //
-        // Connect to first DataNode in the list.
-        //
-        success = createBlockOutputStream(nodes, clientName, false);
-
-        if (!success) {
-          LOG.info("Abandoning block " + block);
-          namenode.abandonBlock(block, src, clientName);
-
-          // Connection failed.  Let's wait a little bit and retry
-          retry = true;
-          try {
-            if (System.currentTimeMillis() - startTime > 5000) {
-              LOG.info("Waiting to find target node: " + nodes[0].getName());
-            }
-            Thread.sleep(6000);
-          } catch (InterruptedException iex) {
-          }
-        }
-      } while (retry && --count >= 0);
-
-      if (!success) {
-        throw new IOException("Unable to create new block.");
+    private void queuePacket(Packet packet) {
+      synchronized (dataQueue) {
+        dataQueue.addLast(packet);
+        dataQueue.notifyAll();
       }
-      return nodes;
     }
 
-    // connects to the first datanode in the pipeline
-    // Returns true if success, otherwise return failure.
-    //
-    private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
-                    boolean recoveryFlag) {
-      String firstBadLink = "";
-      if (LOG.isDebugEnabled()) {
-        for (int i = 0; i < nodes.length; i++) {
-          LOG.debug("pipeline = " + nodes[i].getName());
-        }
-      }
-
-      // persist blocks on namenode on next flush
-      persistBlocks = true;
-
-      try {
-        LOG.debug("Connecting to " + nodes[0].getName());
-        InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
-        s = socketFactory.createSocket();
-        int timeoutValue = (socketTimeout > 0) ? 
-                           (3000 * nodes.length + socketTimeout) : 0;
-        NetUtils.connect(s, target, timeoutValue);
-        s.setSoTimeout(timeoutValue);
-        s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
-        LOG.debug("Send buf size " + s.getSendBufferSize());
-        long writeTimeout = (datanodeWriteTimeout > 0) ? 
-             (HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
-              datanodeWriteTimeout) : 0;
-
-        //
-        // Xmit header info to datanode
-        //
-        DataOutputStream out = new DataOutputStream(
-            new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), 
-                                     DataNode.SMALL_BUFFER_SIZE));
-        blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
-
-        out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
-        out.write( DataTransferProtocol.OP_WRITE_BLOCK );
-        out.writeLong( block.getBlockId() );
-        out.writeLong( block.getGenerationStamp() );
-        out.writeInt( nodes.length );
-        out.writeBoolean( recoveryFlag );       // recovery flag
-        Text.writeString( out, client );
-        out.writeBoolean(false); // Not sending src node information
-        out.writeInt( nodes.length - 1 );
-        for (int i = 1; i < nodes.length; i++) {
-          nodes[i].write(out);
-        }
-        accessToken.write(out);
-        checksum.writeHeader( out );
-        out.flush();
-
-        // receive ack for connect
-        firstBadLink = Text.readString(blockReplyStream);
-        if (firstBadLink.length() != 0) {
-          throw new IOException("Bad connect ack with firstBadLink " + firstBadLink);
-        }
-
-        blockStream = out;
-        return true;     // success
-
-      } catch (IOException ie) {
-
-        LOG.info("Exception in createBlockOutputStream " + ie);
-
-        // find the datanode that matches
-        if (firstBadLink.length() != 0) {
-          for (int i = 0; i < nodes.length; i++) {
-            if (nodes[i].getName().equals(firstBadLink)) {
-              errorIndex = i;
-              break;
-            }
-          }
-        }
-        hasError = true;
-        setLastException(ie);
-        blockReplyStream = null;
-        return false;  // error
-      }
-    }
-  
-    private LocatedBlock locateFollowingBlock(long start
-                                              ) throws IOException {     
-      int retries = 5;
-      long sleeptime = 400;
-      while (true) {
-        long localstart = System.currentTimeMillis();
-        while (true) {
+    private void waitAndQueuePacket(Packet packet) throws IOException {
+      synchronized (dataQueue) {
+        // If queue is full, then wait till we have enough space
+        while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) {
           try {
-            return namenode.addBlock(src, clientName);
-          } catch (RemoteException e) {
-            IOException ue = 
-              e.unwrapRemoteException(FileNotFoundException.class,
-                                      AccessControlException.class,
-                                      QuotaExceededException.class);
-            if (ue != e) { 
-              throw ue; // no need to retry these exceptions
-            }
-            
-            if (--retries == 0 && 
-                !NotReplicatedYetException.class.getName().
-                equals(e.getClassName())) {
-              throw e;
-            } else {
-              LOG.info(StringUtils.stringifyException(e));
-              if (System.currentTimeMillis() - localstart > 5000) {
-                LOG.info("Waiting for replication for " + 
-                         (System.currentTimeMillis() - localstart)/1000 + 
-                         " seconds");
-              }
-              try {
-                LOG.warn("NotReplicatedYetException sleeping " + src +
-                          " retries left " + retries);
-                Thread.sleep(sleeptime);
-                sleeptime *= 2;
-              } catch (InterruptedException ie) {
-              }
-            }                
+            dataQueue.wait();
+          } catch (InterruptedException  e) {
           }
         }
-      } 
+        isClosed();
+        queuePacket(packet);
+      }
     }
-  
+
     // @see FSOutputSummer#writeChunk()
     @Override
     protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
@@ -2983,74 +3021,59 @@
                               this.checksum.getChecksumSize() + 
                               " but found to be " + checksum.length);
       }
-
-      synchronized (dataQueue) {
   
-        // If queue is full, then wait till we can create  enough space
-        while (!closed && dataQueue.size() + ackQueue.size()  > maxPackets) {
-          try {
-            dataQueue.wait();
-          } catch (InterruptedException  e) {
-          }
-        }
-        isClosed();
-  
-        if (currentPacket == null) {
-          currentPacket = new Packet(packetSize, chunksPerPacket, 
-                                     bytesCurBlock);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
-                      currentPacket.seqno +
-                      ", src=" + src +
-                      ", packetSize=" + packetSize +
-                      ", chunksPerPacket=" + chunksPerPacket +
-                      ", bytesCurBlock=" + bytesCurBlock);
-          }
+      if (currentPacket == null) {
+        currentPacket = new Packet(packetSize, chunksPerPacket, 
+            bytesCurBlock);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
+              currentPacket.seqno +
+              ", src=" + src +
+              ", packetSize=" + packetSize +
+              ", chunksPerPacket=" + chunksPerPacket +
+              ", bytesCurBlock=" + bytesCurBlock);
         }
+      }
 
-        currentPacket.writeChecksum(checksum, 0, cklen);
-        currentPacket.writeData(b, offset, len);
-        currentPacket.numChunks++;
-        bytesCurBlock += len;
+      currentPacket.writeChecksum(checksum, 0, cklen);
+      currentPacket.writeData(b, offset, len);
+      currentPacket.numChunks++;
+      bytesCurBlock += len;
 
-        // If packet is full, enqueue it for transmission
+      // If packet is full, enqueue it for transmission
+      //
+      if (currentPacket.numChunks == currentPacket.maxChunks ||
+          bytesCurBlock == blockSize) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DFSClient writeChunk packet full seqno=" +
+              currentPacket.seqno +
+              ", src=" + src +
+              ", bytesCurBlock=" + bytesCurBlock +
+              ", blockSize=" + blockSize +
+              ", appendChunk=" + appendChunk);
+        }
         //
-        if (currentPacket.numChunks == currentPacket.maxChunks ||
-            bytesCurBlock == blockSize) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("DFSClient writeChunk packet full seqno=" +
-                      currentPacket.seqno +
-                      ", src=" + src +
-                      ", bytesCurBlock=" + bytesCurBlock +
-                      ", blockSize=" + blockSize +
-                      ", appendChunk=" + appendChunk);
-          }
-          //
-          // if we allocated a new packet because we encountered a block
-          // boundary, reset bytesCurBlock.
-          //
-          if (bytesCurBlock == blockSize) {
-            currentPacket.lastPacketInBlock = true;
-            bytesCurBlock = 0;
-            lastFlushOffset = -1;
-          }
-          dataQueue.addLast(currentPacket);
-          dataQueue.notifyAll();
-          currentPacket = null;
- 
-          // If this was the first write after reopening a file, then the above
-          // write filled up any partial chunk. Tell the summer to generate full 
-          // crc chunks from now on.
-          if (appendChunk) {
-            appendChunk = false;
-            resetChecksumChunk(bytesPerChecksum);
-          }
-          int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
-          computePacketChunkSize(psize, bytesPerChecksum);
+        // if we allocated a new packet because we encountered a block
+        // boundary, reset bytesCurBlock.
+        //
+        if (bytesCurBlock == blockSize) {
+          currentPacket.lastPacketInBlock = true;
+          bytesCurBlock = 0;
+          lastFlushOffset = -1;
+        }
+        waitAndQueuePacket(currentPacket);
+        currentPacket = null;
+
+        // If this was the first write after reopening a file, then the above
+        // write filled up any partial chunk. Tell the summer to generate full 
+        // crc chunks from now on.
+        if (appendChunk) {
+          appendChunk = false;
+          resetChecksumChunk(bytesPerChecksum);
         }
+        int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
+        computePacketChunkSize(psize, bytesPerChecksum);
       }
-      //LOG.debug("DFSClient writeChunk done length " + len +
-      //          " checksum length " + cklen);
     }
   
     /**
@@ -3059,6 +3082,8 @@
      * datanode. Block allocations are persisted on namenode.
      */
     public synchronized void sync() throws IOException {
+      checkOpen();
+      isClosed();
       try {
         /* Record current blockOffset. This might be changed inside
          * flushBuffer() where a partial checksum chunk might be flushed.
@@ -3095,13 +3120,11 @@
         // If any new blocks were allocated since the last flush, 
         // then persist block locations on namenode. 
         //
-        if (persistBlocks) {
+        if (persistBlocks.getAndSet(false)) {
           namenode.fsync(src, clientName);
-          persistBlocks = false;
         }
       } catch (IOException e) {
           lastException = new IOException("IOException flush:" + e);
-          closed = true;
           closeThreads();
           throw e;
       }
@@ -3114,85 +3137,39 @@
     private synchronized void flushInternal() throws IOException {
       checkOpen();
       isClosed();
+      //
+      // If there is data in the current buffer, send it across
+      //
+      if (currentPacket != null) {
+        queuePacket(currentPacket);
+        currentPacket = null;
+      }
 
-      while (!closed) {
-        synchronized (dataQueue) {
-          isClosed();
-          //
-          // If there is data in the current buffer, send it across
-          //
-          if (currentPacket != null) {
-            dataQueue.addLast(currentPacket);
-            dataQueue.notifyAll();
-            currentPacket = null;
-          }
-
-          // wait for all buffers to be flushed to datanodes
-          if (!closed && dataQueue.size() != 0) {
-            try {
-              dataQueue.wait();
-            } catch (InterruptedException e) {
-            }
-            continue;
-          }
-        }
-
-        // wait for all acks to be received back from datanodes
-        synchronized (ackQueue) {
-          if (!closed && ackQueue.size() != 0) {
-            try {
-              ackQueue.wait();
-            } catch (InterruptedException e) {
-            }
-            continue;
-          }
-        }
-
-        // acquire both the locks and verify that we are
-        // *really done*. In the case of error recovery, 
-        // packets might move back from ackQueue to dataQueue.
-        //
-        synchronized (dataQueue) {
-          synchronized (ackQueue) {
-            if (dataQueue.size() + ackQueue.size() == 0) {
-              break;       // we are done
-            }
+      synchronized (dataQueue) {
+        while (!closed && dataQueue.size() + ackQueue.size() > 0) {
+          try {
+            dataQueue.wait();
+          } catch (InterruptedException  e) {
           }
         }
+        isClosed();
       }
     }
-  
-    /**
-     * Closes this output stream and releases any system 
-     * resources associated with this stream.
-     */
-    @Override
-    public void close() throws IOException {
-      if(closed)
-        return;
-      closeInternal();
-      leasechecker.remove(src);
-      
-      if (s != null) {
-        s.close();
-        s = null;
-      }
-    }
- 
+
     // shutdown datastreamer and responseprocessor threads.
     private void closeThreads() throws IOException {
       try {
         streamer.close();
         streamer.join();
-        
-        // shutdown response after streamer has exited.
-        if (response != null) {
-          response.close();
-          response.join();
-          response = null;
+        if (s != null) {
+          s.close();
         }
       } catch (InterruptedException e) {
-        throw new IOException("Failed to shutdown response thread");
+        throw new IOException("Failed to shutdown streamer");
+      } finally {
+        streamer = null;
+        s = null;
+        closed = true;
       }
     }
     
@@ -3200,65 +3177,56 @@
      * Closes this output stream and releases any system 
      * resources associated with this stream.
      */
-    private synchronized void closeInternal() throws IOException {
-      checkOpen();
-      isClosed();
+    @Override
+    public synchronized void close() throws IOException {
+      if (closed) {
+        IOException e = lastException;
+        if (e == null)
+          return;
+        else
+          throw e;
+      }
 
       try {
-          flushBuffer();       // flush from all upper layers
-      
-          // Mark that this packet is the last packet in block.
-          // If there are no outstanding packets and the last packet
-          // was not the last one in the current block, then create a
-          // packet with empty payload.
-          synchronized (dataQueue) {
-            if (currentPacket == null && bytesCurBlock != 0) {
-              currentPacket = new Packet(packetSize, chunksPerPacket,
-                                         bytesCurBlock);
-            }
-            if (currentPacket != null) { 
-              currentPacket.lastPacketInBlock = true;
-            }
-          }
-
-        flushInternal();             // flush all data to Datanodes
-        isClosed(); // check to see if flushInternal had any exceptions
-        closed = true; // allow closeThreads() to showdown threads
+        flushBuffer();       // flush from all upper layers
 
-        closeThreads();
-        
-        synchronized (dataQueue) {
-          if (blockStream != null) {
-            blockStream.writeInt(0); // indicate end-of-block to datanode
-            blockStream.close();
-            blockReplyStream.close();
-          }
-          if (s != null) {
-            s.close();
-            s = null;
-          }
+        // Mark that this packet is the last packet in block.
+        // If there are no outstanding packets and the last packet
+        // was not the last one in the current block, then create a
+        // packet with empty payload.
+        if (currentPacket == null && bytesCurBlock != 0) {
+          currentPacket = new Packet(packetSize, chunksPerPacket,
+              bytesCurBlock);
+        }
+        if (currentPacket != null) { 
+          currentPacket.lastPacketInBlock = true;
         }
 
-        streamer = null;
-        blockStream = null;
-        blockReplyStream = null;
+        flushInternal();             // flush all data to Datanodes
+        closeThreads();
+        completeFile();
+        leasechecker.remove(src);
+      } finally {
+        closed = true;
+      }
+    }
 
-        long localstart = System.currentTimeMillis();
-        boolean fileComplete = false;
-        while (!fileComplete) {
-          fileComplete = namenode.complete(src, clientName);
-          if (!fileComplete) {
-            try {
-              Thread.sleep(400);
-              if (System.currentTimeMillis() - localstart > 5000) {
-                LOG.info("Could not complete file " + src + " retrying...");
-              }
-            } catch (InterruptedException ie) {
+    // should be called holding (this) lock since setTestFilename() may 
+    // be called during unit tests
+    private void completeFile() throws IOException {
+      long localstart = System.currentTimeMillis();
+      boolean fileComplete = false;
+      while (!fileComplete) {
+        fileComplete = namenode.complete(src, clientName);
+        if (!fileComplete) {
+          try {
+            Thread.sleep(400);
+            if (System.currentTimeMillis() - localstart > 5000) {
+              LOG.info("Could not complete file " + src + " retrying...");
             }
+          } catch (InterruptedException ie) {
           }
         }
-      } finally {
-        closed = true;
       }
     }
 

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/common/StorageInfo.java Tue Jun  9 16:11:19 2009
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hdfs.server.common;
 
 import java.io.DataInput;

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Tue Jun  9 16:11:19 2009
@@ -346,7 +346,10 @@
     info.lastLogTime = now;
     LogFileHandler log = verificationLog;
     if (log != null) {
-      log.appendLine(LogEntry.newEnry(block, now));
+      log.appendLine("date=\"" + dateFormat.format(new Date(now)) + "\"\t " +
+          "time=\"" + now + "\"\t " +
+          "genstamp=\"" + block.getGenerationStamp() + "\"\t " +
+          "id=\"" + block.getBlockId() +"\"");
     }
   }
   
@@ -381,13 +384,6 @@
     private static Pattern entryPattern = 
       Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
     
-    static String newEnry(Block block, long time) {
-      return "date=\"" + dateFormat.format(new Date(time)) + "\"\t " +
-             "time=\"" + time + "\"\t " +
-             "genstamp=\"" + block.getGenerationStamp() + "\"\t " +
-             "id=\"" + block.getBlockId() +"\"";
-    }
-    
     static LogEntry parseEntry(String line) {
       LogEntry entry = new LogEntry();
       

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Jun  9 16:11:19 2009
@@ -92,6 +92,7 @@
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -1379,6 +1380,10 @@
                                       Configuration conf) throws IOException {
     if (conf == null)
       conf = new Configuration();
+    // parse generic hadoop options
+    GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
+    args = hParser.getRemainingArgs();
+    
     if (!parseArguments(args, conf)) {
       printUsage();
       return null;

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Tue Jun  9 16:11:19 2009
@@ -363,8 +363,17 @@
   static void linkBlocks(File from, File to, int oldLV) throws IOException {
     if (!from.isDirectory()) {
       if (from.getName().startsWith(COPY_FILE_PREFIX)) {
-        IOUtils.copyBytes(new FileInputStream(from), 
-                          new FileOutputStream(to), 16*1024, true);
+        FileInputStream in = new FileInputStream(from);
+        try {
+          FileOutputStream out = new FileOutputStream(to);
+          try {
+            IOUtils.copyBytes(in, out, 16*1024);
+          } finally {
+            out.close();
+          }
+        } finally {
+          in.close();
+        }
       } else {
         
         //check if we are upgrading from pre-generation stamp version.

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java Tue Jun  9 16:11:19 2009
@@ -86,9 +86,17 @@
   private void detachFile(File file, Block b) throws IOException {
     File tmpFile = volume.createDetachFile(b, file.getName());
     try {
-      IOUtils.copyBytes(new FileInputStream(file),
-                        new FileOutputStream(tmpFile),
-                        16*1024, true);
+      FileInputStream in = new FileInputStream(file);
+      try {
+        FileOutputStream out = new FileOutputStream(tmpFile);
+        try {
+          IOUtils.copyBytes(in, out, 16*1024);
+        } finally {
+          out.close();
+        }
+      } finally {
+        in.close();
+      }
       if (file.length() != tmpFile.length()) {
         throw new IOException("Copy of file " + file + " size " + file.length()+
                               " into file " + tmpFile +

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Tue Jun  9 16:11:19 2009
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Jun  9 16:11:19 2009
@@ -262,13 +262,12 @@
       ArrayList<EditLogOutputStream> errorStreams,
       boolean propagate) {
     
-    String lsd = fsimage.listStorageDirectories();
-    FSNamesystem.LOG.info("current list of storage dirs:" + lsd);
-    
     if (errorStreams == null || errorStreams.size() == 0) {
       return;                       // nothing to do
     }
 
+    String lsd = fsimage.listStorageDirectories();
+    FSNamesystem.LOG.info("current list of storage dirs:" + lsd);
     //EditLogOutputStream
     if (editStreams == null || editStreams.size() <= 1) {
       FSNamesystem.LOG.fatal(

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jun  9 16:11:19 2009
@@ -3162,7 +3162,7 @@
           return leaveMsg + " upon completion of " + 
             "the distributed upgrade: upgrade progress = " + 
             getDistributedUpgradeStatus() + "%";
-        leaveMsg = "Use \"hadoop dfs -safemode leave\" to turn safe mode off";
+        leaveMsg = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off";
       }
       if(blockTotal < 0)
         return leaveMsg + ".";

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java Tue Jun  9 16:11:19 2009
@@ -309,15 +309,11 @@
 
   /**
    * Breaks file path into components.
-   * @param path an absolute path on the file system
+   * @param path
    * @return array of byte arrays each of which represents 
    * a single path component.
    */
   static byte[][] getPathComponents(String path) {
-    if (!path.startsWith("/")) {
-      throw new IllegalArgumentException(
-        "Must pass an absolute path to getPathComponents");
-    }
     return getPathComponents(getPathNames(path));
   }
 

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Tue Jun  9 16:11:19 2009
@@ -160,7 +160,8 @@
    */
   int getExistingPathINodes(byte[][] components, INode[] existing) {
     assert compareBytes(this.name, components[0]) == 0 :
-      "Incorrect name " + getLocalName() + " expected " + components[0];
+      "Incorrect name " + getLocalName() + " expected " + 
+      bytes2String(components[0]);
 
     INode curNode = this;
     int count = 0;

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Tue Jun  9 16:11:19 2009
@@ -280,7 +280,13 @@
     if (!FSConstants.HDFS_URI_SCHEME.equalsIgnoreCase(fsName.getScheme())) {
       throw new IOException("This is not a DFS");
     }
-    return conf.get("dfs.http.address", "0.0.0.0:50070");
+    String configuredAddress = conf.get("dfs.http.address", "0.0.0.0:50070");
+    InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
+    if (sockAddr.getAddress().isAnyLocalAddress()) {
+      return fsName.getHost() + ":" + sockAddr.getPort();
+    } else {
+      return configuredAddress;
+    }
   }
 
   /**

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/mapred-default.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/mapred-default.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/mapred-default.xml Tue Jun  9 16:11:19 2009
@@ -406,6 +406,16 @@
 </property>
 
 <property>
+  <name>mapred.child.env</name>
+  <value></value>
+  <description>User added environment variables for the task tracker child 
+  processes. Example :
+  1) A=foo  This will set the env variable A to foo
+  2) B=$B:c This is inherit tasktracker's B env variable.  
+  </description>
+</property>
+
+<property>
   <name>mapred.child.ulimit</name>
   <value></value>
   <description>The maximum virtual memory, in KB, of a process launched by the 
@@ -654,6 +664,15 @@
 </property>
 
 <property>
+  <name>mapred.heartbeats.in.second</name>
+  <value>100</value>
+  <description>Expert: Approximate number of heart-beats that could arrive 
+               JobTracker in a second. Assuming each RPC can be processed 
+               in 10msec, the default value is made 100 RPCs in a second.
+  </description>
+</property> 
+
+<property>
   <name>mapred.max.tracker.blacklists</name>
   <value>4</value>
   <description>The number of blacklists for a taskTracker by various jobs
@@ -932,4 +951,32 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.max.maps.per.node</name>
+  <value>-1</value>
+  <description>Per-node limit on running map tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.max.reduces.per.node</name>
+  <value>-1</value>
+  <description>Per-node limit on running reduce tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.running.map.limit</name>
+  <value>-1</value>
+  <description>Cluster-wide limit on running map tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.running.reduce.limit</name>
+  <value>-1</value>
+  <description>Cluster-wide limit on running reduce tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
 </configuration>

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobClient.java Tue Jun  9 16:11:19 2009
@@ -613,7 +613,7 @@
       for (String tmpjars: libjarsArr) {
         Path tmp = new Path(tmpjars);
         Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
-        DistributedCache.addArchiveToClassPath(newPath, job);
+        DistributedCache.addFileToClassPath(newPath, job);
       }
     }
     
@@ -936,19 +936,16 @@
   throws IOException {
     FileStatus[] contents = fs.listStatus(jobDirPath);
     int matchCount = 0;
-    if (contents != null && contents.length >=3) {
+    if (contents != null && contents.length >=2) {
       for (FileStatus status : contents) {
         if ("job.xml".equals(status.getPath().getName())) {
           ++matchCount;
         }
-        if ("job.jar".equals(status.getPath().getName())) {
-          ++matchCount;
-        }
         if ("job.split".equals(status.getPath().getName())) {
           ++matchCount;
         }
       }
-      if (matchCount == 3) {
+      if (matchCount == 2) {
         return true;
       }
     }

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobConf.java Tue Jun  9 16:11:19 2009
@@ -1430,6 +1430,78 @@
   }
   
   /**
+   * Get the per-node limit on running maps for the job
+   * 
+   * @return per-node running map limit
+   */
+  public int getMaxMapsPerNode() {
+    return getInt("mapred.max.maps.per.node", -1);
+  }
+  
+  /**
+   * Set the per-node limit on running maps for the job
+   * 
+   * @param limit per-node running map limit
+   */
+  public void setMaxMapsPerNode(int limit) {
+    setInt("mapred.max.maps.per.node", limit);
+  }
+  
+  /**
+   * Get the per-node limit on running reduces for the job
+   * 
+   * @return per-node running reduce limit
+   */
+  public int getMaxReducesPerNode() {
+    return getInt("mapred.max.reduces.per.node", -1);
+  }
+  
+  /**
+   * Set the per-node limit on running reduces for the job
+   * 
+   * @param limit per-node running reduce limit
+   */
+  public void setMaxReducesPerNode(int limit) {
+    setInt("mapred.max.reduces.per.node", limit);
+  }
+  
+  /**
+   * Get the cluster-wide limit on running maps for the job
+   * 
+   * @return cluster-wide running map limit
+   */
+  public int getRunningMapLimit() {
+    return getInt("mapred.running.map.limit", -1);
+  }
+  
+  /**
+   * Set the cluster-wide limit on running maps for the job
+   * 
+   * @param limit cluster-wide running map limit
+   */
+  public void setRunningMapLimit(int limit) {
+    setInt("mapred.running.map.limit", limit);
+  }
+  
+  /**
+   * Get the cluster-wide limit on running reduces for the job
+   * 
+   * @return cluster-wide running reduce limit
+   */
+  public int getRunningReduceLimit() {
+    return getInt("mapred.running.reduce.limit", -1);
+  }
+  
+  /**
+   * Set the cluster-wide limit on running reduces for the job
+   * 
+   * @param limit cluster-wide running reduce limit
+   */
+  public void setRunningReduceLimit(int limit) {
+    setInt("mapred.running.reduce.limit", limit);
+  }
+  
+  /**
    * Normalize the negative values in configuration
    * 
    * @param val

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobHistory.java Tue Jun  9 16:11:19 2009
@@ -875,6 +875,11 @@
         String logFileName = null;
         if (restarted) {
           logFileName = getJobHistoryFileName(jobConf, jobId);
+          if (logFileName == null) {
+            logFileName =
+              encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
+            
+          }
         } else {
           logFileName = 
             encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));



Mime
View raw message