hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r612903 [1/2] - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/dfs/
Date Thu, 17 Jan 2008 18:11:42 GMT
Author: dhruba
Date: Thu Jan 17 10:11:35 2008
New Revision: 612903

URL: http://svn.apache.org/viewvc?rev=612903&view=rev
Log:
HADOOP-1707. The DFSClient does not use a local disk file to cache
writes to a HDFS file. Changed Data Transfer Version from 7 to 8.
(dhruba)


Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java   (with props)
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Daemon.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileLimit.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 17 10:11:35 2008
@@ -56,6 +56,10 @@
     HADOOP-2012. Periodic data verification on Datanodes.
     (Raghu Angadi via dhruba)
 
+    HADOOP-1707. The DFSClient does not use a local disk file to cache
+    writes to a HDFS file. Changed Data Transfer Version from 7 to 8.
+    (dhruba)
+
   NEW FEATURES
 
     HADOOP-1857.  Ability to run a script when a task fails to capture stack

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Jan 17 10:11:35 2008
@@ -39,6 +39,7 @@
 import java.util.zip.CRC32;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentHashMap;
+import java.nio.ByteBuffer;
 
 import javax.net.SocketFactory;
 import javax.security.auth.login.LoginException;
@@ -59,15 +60,15 @@
   static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
   ClientProtocol namenode;
-  boolean running = true;
+  volatile boolean clientRunning = true;
   Random r = new Random();
   String clientName;
   Daemon leaseChecker;
   private Configuration conf;
   private long defaultBlockSize;
   private short defaultReplication;
-  private LocalDirAllocator dirAllocator;
   private SocketFactory socketFactory;
+  private int socketTimeout;
     
   /**
    * A map from name -> DFSOutputStream of files that are currently being
@@ -136,6 +137,8 @@
   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
     throws IOException {
     this.conf = conf;
+    this.socketTimeout = conf.getInt("dfs.socket.timeout", 
+                                     FSConstants.READ_TIMEOUT);
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     this.namenode = createNamenode(nameNodeAddr, conf);
     String taskId = conf.get("mapred.task.id");
@@ -146,13 +149,12 @@
     }
     defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     defaultReplication = (short) conf.getInt("dfs.replication", 3);
-    dirAllocator = new LocalDirAllocator("dfs.client.buffer.dir");
     this.leaseChecker = new Daemon(new LeaseChecker());
     this.leaseChecker.start();
   }
 
   private void checkOpen() throws IOException {
-    if (!running) {
+    if (!clientRunning) {
       IOException result = new IOException("Filesystem closed");
       throw result;
     }
@@ -179,7 +181,7 @@
         }
         pendingCreates.clear();
       }
-      this.running = false;
+      this.clientRunning = false;
       try {
         leaseChecker.join();
       } catch (InterruptedException ie) {
@@ -579,7 +581,7 @@
      */
     public void run() {
       long lastRenewed = 0;
-      while (running) {
+      while (clientRunning) {
         if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
           try {
             if (pendingCreates.size() > 0)
@@ -727,7 +729,17 @@
         throw new IOException("Mismatch in pos : " + pos + " + " + 
                               firstChunkOffset + " != " + chunkOffset);
       }
-      
+
+      // The chunk is transmitted as one packet. Read packet headers.
+      int packetLen = in.readInt();
+      long offsetInBlock = in.readLong();
+      long seqno = in.readLong();
+      boolean lastPacketInBlock = in.readBoolean();
+      LOG.debug("DFSClient readChunk got seqno " + seqno +
+                " offsetInBlock " + offsetInBlock +
+                " lastPacketInBlock " + lastPacketInBlock +
+                " packetLen " + packetLen);
+
       int chunkLen = in.readInt();
       
       // Sanity check the lengths
@@ -806,7 +818,9 @@
                    new BufferedInputStream(sock.getInputStream(), bufferSize));
       
       if ( in.readShort() != OP_STATUS_SUCCESS ) {
-        throw new IOException("Got error in response to OP_READ_BLOCK");
+        throw new IOException("Got error in response to OP_READ_BLOCK " +
+                              "for file " + file + 
+                              " for block " + blockId);
       }
       DataChecksum checksum = DataChecksum.newDataChecksum( in );
       //Warning when we get CHECKSUM_NULL?
@@ -818,7 +832,7 @@
           firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
         throw new IOException("BlockReader: error in first chunk offset (" +
                               firstChunkOffset + ") startOffset is " + 
-                              startOffset + "for file XXX");
+                              startOffset + " for file " + file);
       }
 
       return new BlockReader( file, blockId, in, checksum,
@@ -1046,8 +1060,8 @@
 
         try {
           s = socketFactory.createSocket();
-          s.connect(targetAddr, READ_TIMEOUT);
-          s.setSoTimeout(READ_TIMEOUT);
+          s.connect(targetAddr, socketTimeout);
+          s.setSoTimeout(socketTimeout);
           Block blk = targetBlock.getBlock();
           
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
@@ -1227,8 +1241,8 @@
             
         try {
           dn = socketFactory.createSocket();
-          dn.connect(targetAddr, READ_TIMEOUT);
-          dn.setSoTimeout(READ_TIMEOUT);
+          dn.connect(targetAddr, socketTimeout);
+          dn.setSoTimeout(socketTimeout);
               
           int len = (int) (end - start + 1);
               
@@ -1249,8 +1263,10 @@
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
           ioe = e;
-          LOG.warn("Failed to connect to " + targetAddr + ":" 
-                    + StringUtils.stringifyException(e));
+          LOG.warn("Failed to connect to " + targetAddr + 
+                   " for file " + src + 
+                   " for block " + block.getBlock().getBlockId() + ":"  +
+                   StringUtils.stringifyException(e));
         } 
         // Put chosen node into dead list, continue
         addToDeadNodes(chosenNode);
@@ -1440,23 +1456,428 @@
 
   /****************************************************************
    * DFSOutputStream creates files from a stream of bytes.
-   ****************************************************************/
+   *
+   * The client application writes data that is cached internally by
+   * this stream. Data is broken up into packets, each packet is
+   * typically 64K in size. A packet comprises of chunks. Each chunk
+   * is typically 512 bytes and has an associated checksum with it.
+   *
+   * When a client application fills up the currentPacket, it is
+   * enqueued into dataQueue.  The DataStreamer thread picks up
+   * packets from the dataQueue, sends it to the first datanode in
+   * the pipeline and moves it from the dataQueue to the ackQueue.
+   * The ResponseProcessor receives acks from the datanodes. When an
+   * successful ack for a packet is received from all datanodes, the
+   * ResponseProcessor removes the corresponding packet from the
+   * ackQueue.
+   *
+   * In case of error, all outstanding packets and moved from
+   * ackQueue. A new pipeline is setup by eliminating the bad
+   * datanode from the original pipeline. The DataStreamer now
+   * starts sending packets from the dataQueue.
+  ****************************************************************/
   class DFSOutputStream extends FSOutputSummer {
     private Socket s;
     boolean closed = false;
-
+  
     private String src;
-    private short replication;
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
-    private File backupFile;
-    private OutputStream backupStream;
     private Block block;
-    private long filePos = 0;
-    private long bytesWrittenToBlock = 0;
     private long blockSize;
     private int buffersize;
     private DataChecksum checksum;
+    private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
+    private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
+    private Packet currentPacket = null;
+    private int maxPackets = 80; // each packet 64K, total 5MB
+    // private int maxPackets = 1000; // each packet 64K, total 64MB
+    private DataStreamer streamer;
+    private ResponseProcessor response = null;
+    private long currentSeqno = 0;
+    private long bytesCurBlock = 0; // bytes writen in current block
+    private int packetSize = 0;
+    private int chunksPerPacket = 0;
+    private int chunksPerBlock = 0;
+    private int chunkSize = 0;
+    private DatanodeInfo[] nodes = null; // list of targets for current block
+    private volatile boolean hasError = false;
+    private volatile int errorIndex = 0;
+    private IOException lastException = new IOException("Stream closed.");
+    private long artificialSlowdown = 0;
+
+    private class Packet {
+      ByteBuffer buffer;
+      long    seqno;               // sequencenumber of buffer in block
+      long    offsetInBlock;       // offset in block
+      boolean lastPacketInBlock;   // is this the last packet in block?
+      int     numChunks;           // number of chunks currently in packet
+  
+      // create a new packet
+      Packet(int size, long offsetInBlock) {
+        buffer = ByteBuffer.allocate(size);
+        buffer.clear();
+        this.lastPacketInBlock = false;
+        this.numChunks = 0;
+        this.offsetInBlock = offsetInBlock;
+        this.seqno = currentSeqno;
+        currentSeqno++;
+      }
+  
+      // writes len bytes from offset off in inarray into
+      // this packet.
+      // 
+      void write(byte[] inarray, int off, int len) {
+        buffer.put(inarray, off, len);
+      }
+  
+      // writes an integer into this packet. 
+      //
+      void  writeInt(int value) {
+       buffer.putInt(value);
+      }
+    }
+  
+    //
+    // The DataStreamer class is responsible for sending data packets to the
+    // datanodes in the pipeline. It retrieves a new blockid and block locations
+    // from the namenode, and starts streaming packets to the pipeline of
+    // Datanodes. Every packet has a sequence number associated with
+    // it. When all the packets for a block are sent out and acks for each
+    // if them are received, the DataStreamer closes the current block.
+    //
+    private class DataStreamer extends Thread {
+
+      private volatile boolean closed = false;
+  
+      public void run() {
+
+        while (!closed && clientRunning) {
+
+          // if the Responder encountered an error, shutdown Responder
+          if (hasError && response != null) {
+            try {
+              response.close();
+              response.join();
+              response = null;
+            } catch (InterruptedException  e) {
+            }
+          }
+
+          Packet one = null;
+          synchronized (dataQueue) {
+
+            // process IO errors if any
+            processDatanodeError();
+
+            // wait for a packet to be sent.
+            while (!closed && !hasError && clientRunning 
+                   && dataQueue.size() == 0) {
+              try {
+                dataQueue.wait(1000);
+              } catch (InterruptedException  e) {
+              }
+            }
+            if (closed || hasError || dataQueue.size() == 0 || !clientRunning) {
+              continue;
+            }
+
+            try {
+              // get packet to be sent.
+              one = dataQueue.getFirst();
+              int len = one.buffer.limit();
+  
+              // get new block from namenode.
+              if (blockStream == null) {
+                LOG.info("Allocating new block");
+                nodes = nextBlockOutputStream(src); 
+                this.setName("DataStreamer for file " + src +
+                             " block " + block);
+                response = new ResponseProcessor(nodes);
+                response.start();
+              }
+
+              // user bytes from 'position' to 'limit'.
+              byte[] arr = one.buffer.array();
+              if (one.offsetInBlock >= blockSize) {
+                throw new IOException("BlockSize " + blockSize +
+                                      " is smaller than data size. " +
+                                      " Offset of packet in block " + 
+                                      one.offsetInBlock +
+                                      " Aborting file " + src);
+              }
+
+              // move packet from dataQueue to ackQueue
+              dataQueue.removeFirst();
+              dataQueue.notifyAll();
+              synchronized (ackQueue) {
+                ackQueue.addLast(one);
+                ackQueue.notifyAll();
+              } 
+  
+              // write out data to remote datanode
+              blockStream.writeInt(len); // size of this packet
+              blockStream.writeLong(one.offsetInBlock); // data offset in block
+              blockStream.writeLong(one.seqno); // sequence num of packet
+              blockStream.writeBoolean(one.lastPacketInBlock); 
+              blockStream.write(arr, 0, len);
+              if (one.lastPacketInBlock) {
+                blockStream.writeInt(0); // indicate end-of-block 
+              }
+              blockStream.flush();
+              LOG.debug("DataStreamer block " + block +
+                        " wrote packet seqno:" + one.seqno +
+                        " size:" + len + 
+                        " offsetInBlock:" + one.offsetInBlock + 
+                        " lastPacketInBlock:" + one.lastPacketInBlock);
+            } catch (IOException e) {
+              LOG.warn("DataStreamer Exception: " + e);
+			  hasError = true;
+		    }
+	      }
+
+          if (closed || hasError || !clientRunning) {
+            continue;
+          }
+
+          // Is this block full?
+          if (one.lastPacketInBlock) {
+            synchronized (ackQueue) {
+              while (!hasError && ackQueue.size() != 0 && clientRunning) {
+                try {
+                  ackQueue.wait();   // wait for acks to arrive from datanodes
+                } catch (InterruptedException  e) {
+                }
+              }
+            }
+            LOG.info("Closing old block " + block);
+            this.setName("DataStreamer for file " + src);
+
+            response.close();        // ignore all errors in Response
+            try {
+              response.join();
+              response = null;
+            } catch (InterruptedException  e) {
+            }
+
+            if (closed || hasError || !clientRunning) {
+              continue;
+            }
+
+            synchronized (dataQueue) {
+              try {
+                blockStream.close();
+                blockReplyStream.close();
+              } catch (IOException e) {
+              }
+              nodes = null;
+              response = null;
+              blockStream = null;
+              blockReplyStream = null;
+            }
+          }
+          if (progress != null) { progress.progress(); }
+
+          // This is used by unit test to trigger race conditions.
+          if (artificialSlowdown != 0 && clientRunning) {
+            try { 
+              Thread.sleep(artificialSlowdown); 
+            } catch (InterruptedException e) {}
+          }
+		}
+	  }
+
+      // shutdown thread
+      void close() {
+        closed = true;
+        synchronized (dataQueue) {
+          dataQueue.notifyAll();
+        }
+        synchronized (ackQueue) {
+          ackQueue.notifyAll();
+        }
+        this.interrupt();
+      }
+	}
+		  
+    //
+	// Processes reponses from the datanodes.  A packet is removed 
+	// from the ackQueue when its response arrives.
+	//
+    private class ResponseProcessor extends Thread {
+
+      private volatile boolean closed = false;
+      private DatanodeInfo[] targets = null;
+      private boolean lastPacketInBlock = false;
+
+      ResponseProcessor (DatanodeInfo[] targets) {
+        this.targets = targets;
+      }
+
+	  public void run() {
+
+        this.setName("ResponseProcessor for block " + block);
+  
+        while (!closed && clientRunning && !lastPacketInBlock) {
+		    // process responses from datanodes.
+		    try {
+			  // verify seqno from datanode
+              int numTargets = -1;
+			  long seqno = blockReplyStream.readLong();
+              LOG.debug("DFSClient received ack for seqno " + seqno);
+              if (seqno == -1) {
+                continue;
+              } else if (seqno == -2) {
+                // no nothing
+              } else {
+			    Packet one = null;
+			    synchronized (ackQueue) {
+			      one = ackQueue.getFirst();
+			    }
+			    if (one.seqno != seqno) {
+			      throw new IOException("Responseprocessor: Expecting seqno " + 
+                                        " for block " + block +
+			                            one.seqno + " but received " + seqno);
+			    }
+                lastPacketInBlock = one.lastPacketInBlock;
+              }
+
+              // processes response status from all datanodes.
+              for (int i = 0; i < targets.length && clientRunning; i++) {
+                short reply = blockReplyStream.readShort();
+                if (reply != OP_STATUS_SUCCESS) {
+                  errorIndex = i; // first bad datanode
+                  throw new IOException("Bad response " + reply +
+                                        " for block " + block +
+                                        " from datanode " + 
+                                        targets[i].getName());
+                }
+              }
+
+              synchronized (ackQueue) {
+                ackQueue.removeFirst();
+                ackQueue.notifyAll();
+              }
+            } catch (Exception e) {
+              if (!closed) {
+                hasError = true;
+                LOG.warn("DFSOutputStream ResponseProcessor exception " + 
+                         " for block " + block +
+                          StringUtils.stringifyException(e));
+                closed = true;
+              }
+            }
+
+            synchronized (dataQueue) {
+              dataQueue.notifyAll();
+            }
+            synchronized (ackQueue) {
+              ackQueue.notifyAll();
+            }
+          }
+        }
+
+        void close() {
+          closed = true;
+          this.interrupt();
+        }
+      }
+
+    // If this stream has encountered any errors so far, shutdown 
+    // threads and mark stream as closed.
+    //
+    private void processDatanodeError() {
+      if (!hasError) {
+        return;
+      }
+      if (response != null) {
+        LOG.info("Error Recovery for block " + block +
+                 " waiting for responder to exit. ");
+        return;
+      }
+      String msg = "Error Recovery for block " + block +
+                   " bad datanode[" + errorIndex + "]";
+      if (nodes != null) {
+        msg += " " + nodes[errorIndex].getName();
+      }
+      LOG.warn(msg);
+
+      if (blockStream != null) {
+        try {
+          blockStream.close();
+          blockReplyStream.close();
+        } catch (IOException e) {
+        }
+      }
+      blockStream = null;
+      blockReplyStream = null;
+
+      // move packets from ack queue to front of the data queue
+      synchronized (ackQueue) {
+        dataQueue.addAll(0, ackQueue);
+        ackQueue.clear();
+      }
+
+      boolean success = false;
+      while (!success && clientRunning) {
+        if (nodes == null) {
+          lastException = new IOException("Could not get block locations. " +
+                                          "Aborting...");
+          closed = true;
+          streamer.close();
+          return;
+        }
+        if (nodes.length <= 1) {
+          lastException = new IOException("All datanodes are bad. Aborting...");
+          closed = true;
+          streamer.close();
+          return;
+        }
+        LOG.warn("Error Recovery for block " + block +
+                 " bad datanode " + nodes[errorIndex].getName());
+
+        // remove bad datanode from list of datanodes.
+        //
+        DatanodeInfo[] newnodes =  new DatanodeInfo[nodes.length-1];
+        for (int i = 0; i < errorIndex; i++) {
+          newnodes[i] = nodes[i];
+        }
+        for (int i = errorIndex; i < (nodes.length-1); i++) {
+          newnodes[i] = nodes[i+1];
+        }
+        nodes = newnodes;
+
+        // setup new pipeline
+        success = createBlockOutputStream(nodes, src, true);
+        hasError = false;
+        errorIndex = 0;
+      }
+
+      response = new ResponseProcessor(nodes);
+      response.start();
+    }
+
+    private void isClosed() throws IOException {
+      if (closed) {
+        throw lastException;
+      }
+    }
+
+    //
+    // returns the list of targets, if any, that is being currently used.
+    //
+    DatanodeInfo[] getPipeline() {
+      synchronized (dataQueue) {
+        if (nodes == null) {
+          return null;
+        }
+        DatanodeInfo[] value = new DatanodeInfo[nodes.length];
+        for (int i = 0; i < nodes.length; i++) {
+          value[i] = nodes[i];
+        }
+        return value;
+      }
+    }
 
     private Progressable progress;
     /**
@@ -1471,7 +1892,6 @@
                            ) throws IOException {
       super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
       this.src = src;
-      this.replication = replication;
       this.blockSize = blockSize;
       this.buffersize = buffersize;
       this.progress = progress;
@@ -1487,110 +1907,137 @@
                               "multiple of io.bytes.per.checksum");
                               
       }
-      
       checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
                                               bytesPerChecksum);
+      // A maximum of 128 chunks per packet, i.e. 64K packet size.
+      chunkSize = bytesPerChecksum + 2 * SIZE_OF_INTEGER; // user data & checksum
+      chunksPerBlock = (int)(blockSize / bytesPerChecksum);
+      chunksPerPacket = Math.min(chunksPerBlock, 128);
+      packetSize = chunkSize * chunksPerPacket;
+
       namenode.create(
           src, masked, clientName, overwrite, replication, blockSize);
+      streamer = new DataStreamer();
+      streamer.start();
     }
-
-    private void openBackupStream() throws IOException {
-      File tmpFile = newBackupFile();
-      backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile),
-                                              buffersize);
-      backupFile = tmpFile;
-    }
-    
-    /* Wrapper for closing backupStream. This sets backupStream to null so
-     * that we do not attempt to write to backupStream that could be
-     * invalid in subsequent writes. Otherwise we might end trying to write
-     * filedescriptor that we don't own.
-     */
-    private void closeBackupStream() throws IOException {
-      if (backupStream != null) {
-        backupStream.flush();
-        OutputStream stream = backupStream;
-        backupStream = null;
-        stream.close();
-      }   
-    }
-    /* Similar to closeBackupStream(). Theoritically deleting a file
-     * twice could result in deleting a file that we should not.
-     */
-    private void deleteBackupFile() {
-      if (backupFile != null) {
-        File file = backupFile;
-        backupFile = null;
-        file.delete();
-      }
-    }
-        
-    private File newBackupFile() throws IOException {
-      String name = "tmp" + File.separator +
-                     "client-" + Math.abs(r.nextLong());
-      File result = dirAllocator.createTmpFileForWrite(name, 
-                                                       2 * blockSize, 
-                                                       conf);
-      return result;
-    }
-
+  
     /**
      * Open a DataOutputStream to a DataNode so that it can be written to.
      * This happens when a file is created and each time a new block is allocated.
      * Must get block ID and the IDs of the destinations from the namenode.
+     * Returns the list of target datanodes.
      */
-    private synchronized void nextBlockOutputStream() throws IOException {
+    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
+      LocatedBlock lb = null;
       boolean retry = false;
-      long startTime = System.currentTimeMillis();
+      DatanodeInfo[] nodes;
+      int count = conf.getInt("dfs.client.block.write.retries", 3);
+      boolean success;
       do {
+        hasError = false;
+        errorIndex = 0;
         retry = false;
+        nodes = null;
+        success = false;
                 
-        LocatedBlock lb = locateFollowingBlock(startTime);
+        long startTime = System.currentTimeMillis();
+        lb = locateFollowingBlock(startTime);
         block = lb.getBlock();
-        if (block.getNumBytes() < bytesWrittenToBlock) {
-          block.setNumBytes(bytesWrittenToBlock);
-        }
-        DatanodeInfo nodes[] = lb.getLocations();
-
+        nodes = lb.getLocations();
+  
         //
-        // Connect to first DataNode in the list.  Abort if this fails.
+        // Connect to first DataNode in the list.
         //
-        InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
-        try {
-          s = socketFactory.createSocket();
-          s.connect(target, READ_TIMEOUT);
-          s.setSoTimeout(replication * READ_TIMEOUT);
-        } catch (IOException ie) {
+        success = createBlockOutputStream(nodes, client, false);
+
+        if (!success) {
+          LOG.info("Abandoning block " + block);
+          namenode.abandonBlock(block, src, clientName);
+
           // Connection failed.  Let's wait a little bit and retry
+          retry = true;
           try {
             if (System.currentTimeMillis() - startTime > 5000) {
-              LOG.info("Waiting to find target node: " + target);
+              LOG.info("Waiting to find target node: " + nodes[0].getName());
             }
             Thread.sleep(6000);
           } catch (InterruptedException iex) {
           }
-          namenode.abandonBlock(block, src, clientName);
-          retry = true;
-          continue;
         }
+      } while (retry && --count >= 0);
+
+      if (!success) {
+        throw new IOException("Unable to create new block.");
+      }
+      return nodes;
+    }
+
+    // connects to the first datanode in the pipeline
+    // Returns true if success, otherwise return failure.
+    //
+    private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
+                    boolean recoveryFlag) {
+      String firstBadLink = "";
+      for (int i = 0; i < nodes.length; i++) {
+        LOG.info("pipeline = " + nodes[i].getName());
+      }
+      try {
+        LOG.info("Connecting to " + nodes[0].getName());
+        InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
+        s = socketFactory.createSocket();
+        int timeoutValue = 3000 * nodes.length + socketTimeout;
+        s.connect(target, timeoutValue);
+        s.setSoTimeout(timeoutValue);
+        s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
+        LOG.debug("Send buf size " + s.getSendBufferSize());
 
         //
         // Xmit header info to datanode
         //
         DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize));
+        blockReplyStream = new DataInputStream(s.getInputStream());
+
         out.writeShort( DATA_TRANFER_VERSION );
         out.write( OP_WRITE_BLOCK );
         out.writeLong( block.getBlockId() );
+        out.writeInt( nodes.length );
+        out.writeBoolean( recoveryFlag );       // recovery flag
+        Text.writeString( out, client );
         out.writeInt( nodes.length - 1 );
         for (int i = 1; i < nodes.length; i++) {
           nodes[i].write(out);
         }
         checksum.writeHeader( out );
+        out.flush();
+
+        // receive ack for connect
+        firstBadLink = Text.readString(blockReplyStream);
+        if (firstBadLink.length() != 0) {
+          throw new IOException("Bad connect ack with firstBadLink " + firstBadLink);
+        }
+
         blockStream = out;
-        blockReplyStream = new DataInputStream(s.getInputStream());
-      } while (retry);
-    }
+        return true;     // success
+
+      } catch (IOException ie) {
 
+        LOG.info("Exception in createBlockOutputStream " + ie);
+
+        // find the datanode that matches
+        if (firstBadLink.length() != 0) {
+          for (int i = 0; i < nodes.length; i++) {
+            if (nodes[i].getName().equals(firstBadLink)) {
+              errorIndex = i;
+              break;
+            }
+          }
+        }
+        hasError = true;
+        blockReplyStream = null;
+        return false;  // error
+      }
+    }
+  
     private LocatedBlock locateFollowingBlock(long start
                                               ) throws IOException {     
       int retries = 5;
@@ -1601,7 +2048,7 @@
           try {
             return namenode.addBlock(src.toString(), clientName);
           } catch (RemoteException e) {
-            if (--retries == 0 || 
+            if (--retries == 0 && 
                 !NotReplicatedYetException.class.getName().
                 equals(e.getClassName())) {
               throw e;
@@ -1624,144 +2071,134 @@
         }
       } 
     }
-
+  
     // @see FSOutputSummer#writeChunk()
     @Override
     protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
                                                           throws IOException {
       checkOpen();
+      isClosed();
+  
+      int cklen = checksum.length;
       int bytesPerChecksum = this.checksum.getBytesPerChecksum(); 
-      if (len > bytesPerChecksum || (len + bytesWrittenToBlock) > blockSize) {
-        // should never happen
-        throw new IOException("Mismatch in writeChunk() args");
-      }
-      
-      if ( backupFile == null ) {
-        openBackupStream();
-      }
-      
-      backupStream.write(b, offset, len);
-      backupStream.write(checksum);
-      
-      bytesWrittenToBlock += len;
-      filePos += len;
-      
-      if ( bytesWrittenToBlock >= blockSize ) {
-        endBlock();
+      if (len > bytesPerChecksum) {
+        throw new IOException("writeChunk() buffer size is " + len +
+                              " is larger than supported  bytesPerChecksum " +
+                              bytesPerChecksum);
+      }
+      if (checksum.length != this.checksum.getChecksumSize()) {
+        throw new IOException("writeChunk() checksum size is supposed to be " +
+                              this.checksum.getChecksumSize() + 
+                              " but found to be " + checksum.length);
+      }
+      if (len + cklen + SIZE_OF_INTEGER > chunkSize) {
+        throw new IOException("writeChunk() found data of size " +
+                              (len + cklen + 4) +
+                              " that cannot be larger than chukSize " + 
+                              chunkSize);
       }
 
-    }
-
-    /**
-     * We're done writing to the current block.
-     */
-    private synchronized void endBlock() throws IOException {
-      long sleeptime = 400;
-      //
-      // Done with local copy
-      //
-      closeBackupStream();
-
-      //
-      // Send it to datanode
-      //
-      boolean sentOk = false;
-      int remainingAttempts = 
-        conf.getInt("dfs.client.block.write.retries", 3);
-      int numSuccessfulWrites = 0;
-            
-      while (!sentOk) {
-        nextBlockOutputStream();
-
-        long bytesLeft = bytesWrittenToBlock;
-        int bytesPerChecksum = checksum.getBytesPerChecksum();
-        int checksumSize = checksum.getChecksumSize(); 
-        byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
-
-        InputStream in = null;
-        if ( bytesLeft > 0 ) { 
-          in = new BufferedInputStream(new FileInputStream(backupFile),
-                                       buffersize);
+      synchronized (dataQueue) {
+  
+        // If queue is full, then wait till we can create  enough space
+        while (!closed && dataQueue.size() + ackQueue.size()  > maxPackets) {
+          try {
+            dataQueue.wait();
+          } catch (InterruptedException  e) {
+          }
+        }
+        isClosed();
+  
+        if (currentPacket == null) {
+          currentPacket = new Packet(packetSize, bytesCurBlock);
         }
-        
-        try {
-
-          while ( bytesLeft > 0 ) {
-            int len = (int) Math.min( bytesLeft, bytesPerChecksum );
-            IOUtils.readFully( in, buf, 0, len + checksumSize);
-
-            blockStream.writeInt( len );
-            blockStream.write( buf, 0, len + checksumSize );
 
-            bytesLeft -= len;
+        currentPacket.writeInt(len);
+        currentPacket.write(b, offset, len);
+        currentPacket.write(checksum, 0, cklen);
+        currentPacket.numChunks++;
+        bytesCurBlock += len;
 
-            if (progress != null) { progress.progress(); }
+        // If packet is full, enqueue it for transmission
+        //
+        if (currentPacket.numChunks == chunksPerPacket ||
+            bytesCurBlock == chunksPerBlock * bytesPerChecksum) {
+          LOG.debug("DFSClient writeChunk packet full seqno " + currentPacket.seqno);
+          currentPacket.buffer.flip();
+          //
+          // if we allocated a new packet because we encountered a block
+          // boundary, reset bytesCurBlock.
+          //
+          if (bytesCurBlock == chunksPerBlock * bytesPerChecksum) {
+            currentPacket.lastPacketInBlock = true;
+            bytesCurBlock = 0;
           }
-
-          // write 0 to mark the end of a block
-          blockStream.writeInt(0);
-          blockStream.flush();
-          
-          numSuccessfulWrites++;
-
-          //We should wait for response from the receiver.
-          int reply = blockReplyStream.readShort();
-          if ( reply == OP_STATUS_SUCCESS ||
-              ( reply == OP_STATUS_ERROR_EXISTS &&
-                  numSuccessfulWrites > 1 ) ) {
-            s.close();
-            s = null;
-            sentOk = true;
-          } else {
-            throw new IOException( "Got error reply " + reply +
-                                   " while writting the block " 
-                                   + block );
+          dataQueue.addLast(currentPacket);
+          dataQueue.notifyAll();
+          currentPacket = null;
+        }
+      }
+      //LOG.debug("DFSClient writeChunk with length " + len +
+      //          " checksum length " + cklen);
+    }
+  
+    /**
+     * Waits till all existing data is flushed and
+     * confirmations received from datanodes.
+     */
+    @Override
+    public synchronized void flush() throws IOException {
+      checkOpen();
+      isClosed();
+  
+      while (!closed) {
+        synchronized (dataQueue) {
+          isClosed();
+          //
+          // if there is data in the current buffer, send it across
+          //
+          if (currentPacket != null) {
+            currentPacket.buffer.flip();
+            dataQueue.addLast(currentPacket);
+            dataQueue.notifyAll();
+            currentPacket = null;
           }
 
-        } catch (IOException ie) {
-          /*
-           * The error could be OP_STATUS_ERROR_EXISTS.
-           * We are not handling it properly here yet.
-           * We should try to read a byte from blockReplyStream
-           * wihtout blocking. 
-           */
-          handleSocketException(ie);
-          remainingAttempts -= 1;
-          if (remainingAttempts == 0) {
-            throw ie;
-          }
-          try {
-            Thread.sleep(sleeptime);
-          } catch (InterruptedException e) {
-          }
-        } finally {
-          if (in != null) {
-            in.close();
+          // wait for all buffers to be flushed to datanodes
+          if (!closed && dataQueue.size() != 0) {
+            try {
+              dataQueue.wait();
+            } catch (InterruptedException e) {
+            }
+            continue;
           }
         }
-      }
 
-      bytesWrittenToBlock = 0;
-      //
-      // Delete local backup.
-      //
-      deleteBackupFile();
-    }
+        // wait for all acks to be received back from datanodes
+        synchronized (ackQueue) {
+          if (!closed && ackQueue.size() != 0) {
+            try {
+              ackQueue.wait();
+            } catch (InterruptedException e) {
+            }
+            continue;
+          }
+        }
 
-    private void handleSocketException(IOException ie) throws IOException {
-      LOG.warn("Error while writing.", ie);
-      try {
-        if (s != null) {
-          s.close();
-          s = null;
+        // acquire both the locks and verify that we are
+        // *really done*. In the case of error recovery, 
+        // packets might move back from ackQueue to dataQueue.
+        //
+        synchronized (dataQueue) {
+          synchronized (ackQueue) {
+            if (dataQueue.size() + ackQueue.size() == 0) {
+              break;       // we are done
+            }
+          }
         }
-      } catch (IOException ie2) {
-        LOG.warn("Error closing socket.", ie2);
       }
-      //XXX Why are we abondoning the block? There could be retries left.
-      namenode.abandonBlock(block, src, clientName);
     }
-
+  
     private void internalClose() throws IOException {
       // Clean up any resources that might be held.
       closed = true;
@@ -1774,8 +2211,6 @@
         s.close();
         s = null;
       }
-      
-      closeBackupStream();
     }
     
     /**
@@ -1785,27 +2220,56 @@
     @Override
     public synchronized void close() throws IOException {
       checkOpen();
-      if (closed) {
-        throw new IOException("Stream closed");
-      }
-      
+      isClosed();
+
       try {
-        flushBuffer();
-        
-        if (filePos == 0 || bytesWrittenToBlock != 0) {
-          try {
-            endBlock();
-          } catch (IOException e) {
-            namenode.abandonFileInProgress(src.toString(), clientName);
-            throw e;
+        flushBuffer();       // flush from all upper layers
+      
+        // Mark that this packet is the last packet in block.
+        // If there are no outstanding packets and the last packet
+        // was not the last one in the current block, then create a
+        // packet with empty payload.
+        synchronized (dataQueue) {
+          if (currentPacket == null && bytesCurBlock != 0) {
+            currentPacket = new Packet(packetSize, bytesCurBlock);
+            currentPacket.writeInt(0); // one chunk with empty contents
+          }
+          if (currentPacket != null) { 
+            currentPacket.lastPacketInBlock = true;
           }
         }
 
-        if (s != null) {
-          s.close();
-          s = null;
+        flush();             // flush all data to Datanodes
+        closed = true;
+ 
+        // wait for threads to finish processing
+        streamer.close();
+
+        synchronized (dataQueue) {
+          if (response != null) {
+            response.close();
+          }
+          if (blockStream != null) {
+            blockStream.writeInt(0); // indicate end-of-block to datanode
+            blockStream.close();
+            blockReplyStream.close();
+          }
+          if (s != null) {
+            s.close();
+            s = null;
+          }
         }
 
+        // wait for threads to exit
+        if (response != null) {
+          response.join();
+        }
+        streamer.join();
+        streamer = null;
+        blockStream = null;
+        blockReplyStream = null;
+        response = null;
+
         long localstart = System.currentTimeMillis();
         boolean fileComplete = false;
         while (!fileComplete) {
@@ -1814,24 +2278,35 @@
             try {
               Thread.sleep(400);
               if (System.currentTimeMillis() - localstart > 5000) {
-                LOG.info("Could not complete file, retrying...");
+                LOG.info("Could not complete file " + src + " retrying...");
               }
             } catch (InterruptedException ie) {
             }
           }
         }
+      } catch (InterruptedException e) {
+        throw new IOException("Failed to shutdown response thread");
       } finally {
         internalClose();
       }
     }
+
+    void setArtificialSlowdown(long period) {
+      artificialSlowdown = period;
+    }
+
+    void setChunksPerPacket(int value) {
+      chunksPerPacket = Math.min(chunksPerPacket, value);
+      packetSize = chunkSize * chunksPerPacket;
+    }
   }
-  
+
   void reportChecksumFailure(String file, Block blk, DatanodeInfo dn) {
     DatanodeInfo [] dnArr = { dn };
     LocatedBlock [] lblocks = { new LocatedBlock(blk, dnArr) };
     reportChecksumFailure(file, lblocks);
   }
-  
+    
   // just reports checksum failure and ignores any exception during the report.
   void reportChecksumFailure(String file, LocatedBlock lblocks[]) {
     try {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java Thu Jan 17 10:11:35 2008
@@ -205,6 +205,9 @@
   public int getNumBytesInSum() {
     return inSum;
   }
+  public int getChecksumHeaderSize() {
+    return 2 + 1 + 4; // version: short + type : byte + bytesPerChecksum : int
+  }
   //Checksum Interface. Just a wrapper around member summer.
   public long getValue() {
     return summer.getValue();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Jan 17 10:11:35 2008
@@ -35,6 +35,7 @@
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.dfs.BlockCommand;
 import org.apache.hadoop.dfs.DatanodeProtocol;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.dfs.FSDatasetInterface.MetaDataInputStream;
 
 import java.io.*;
@@ -102,6 +103,7 @@
   final private static String EMPTY_DEL_HINT = "";
   int xmitsInProgress = 0;
   Daemon dataXceiveServer = null;
+  ThreadGroup threadGroup = null;
   long blockReportInterval;
   long lastBlockReport = 0;
   boolean resetBlockReportTime = true;
@@ -116,6 +118,7 @@
   private Thread dataNodeThread = null;
   String machineName;
   int defaultBytesPerChecksum = 512;
+  private int socketTimeout;
   
   private DataBlockScanner blockScanner;
   private Daemon blockScannerThread;
@@ -125,6 +128,10 @@
   private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
   long balanceBandwidth;
   private Throttler balancingThrottler;
+
+  // Record all sockets opend for data transfer
+  Map<Socket, Socket> childSockets = Collections.synchronizedMap(
+                                       new HashMap<Socket, Socket>());
   
   /**
    * Current system time.
@@ -258,7 +265,8 @@
     
     this.defaultBytesPerChecksum = 
        Math.max(conf.getInt("io.bytes.per.checksum", 512), 1); 
-    
+    this.socketTimeout =  conf.getInt("dfs.socket.timeout",
+                                      FSConstants.READ_TIMEOUT);
     String address = conf.get("dfs.datanode.bindAddress", "0.0.0.0:50010");
     InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
     int tmpPort = socAddr.getPort();
@@ -303,12 +311,15 @@
       
     // find free port
     ServerSocket ss = new ServerSocket(tmpPort, 0, socAddr.getAddress());
+    ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
     // adjust machine name with the actual port
     tmpPort = ss.getLocalPort();
     this.dnRegistration.setName(machineName + ":" + tmpPort);
     LOG.info("Opened server at " + tmpPort);
       
-    this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
+    this.threadGroup = new ThreadGroup("dataXceiveServer");
+    this.dataXceiveServer = new Daemon(threadGroup, new DataXceiveServer(ss));
+    this.threadGroup.setDaemon(true); // auto destroy when empty
 
     long blockReportIntervalBasis =
       conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
@@ -496,6 +507,16 @@
     }
   }
 
+  private void enumerateThreadGroup(ThreadGroup tg) {
+    int count = tg.activeCount();
+    Thread[] info = new Thread[count];
+    int num = tg.enumerate(info);
+    for (int i = 0; i < num; i++) {
+      System.out.print(info[i].getName() + " ");
+    }
+    System.out.println("");
+  }
+
   /**
    * Shut down this instance of the datanode.
    * Returns only after shutdown is complete.
@@ -511,6 +532,22 @@
     if (dataXceiveServer != null) {
       ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
       this.dataXceiveServer.interrupt();
+
+      // wait for all data receiver threads to exit
+      if (this.threadGroup != null) {
+        while (true) {
+          this.threadGroup.interrupt();
+          LOG.info("Waiting for threadgroup to exit, active threads is " +
+                   this.threadGroup.activeCount());
+          if (this.threadGroup.isDestroyed() ||
+              this.threadGroup.activeCount() == 0) {
+            break;
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {}
+        }
+      }
     }
     if(upgradeManager != null)
       upgradeManager.shutdownUpgrade();
@@ -797,23 +834,25 @@
         break;
       }
       if (xferTargets[i].length > 0) {
-        LOG.info(dnRegistration + ":Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+        LOG.info(dnRegistration + " Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i][0].getName());
         new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
       }
     }
   }
 
   /* utility function for receiving a response */
-  private static void receiveResponse(Socket s) throws IOException {
+  private static void receiveResponse(Socket s, int numTargets) throws IOException {
     // check the response
     DataInputStream reply = new DataInputStream(new BufferedInputStream(
         s.getInputStream(), BUFFER_SIZE));
     try {
-      short opStatus = reply.readShort();
-      if(opStatus != OP_STATUS_SUCCESS) {
-        throw new IOException("operation failed at "+
-            s.getInetAddress());
-      } 
+      for (int i = 0; i < numTargets; i++) {
+        short opStatus = reply.readShort();
+        if(opStatus != OP_STATUS_SUCCESS) {
+          throw new IOException("operation failed at "+
+              s.getInetAddress());
+        } 
+      }
     } finally {
       IOUtils.closeStream(reply);
     }
@@ -866,7 +905,8 @@
       try {
         while (shouldRun) {
           Socket s = ss.accept();
-          new Daemon(new DataXceiver(s)).start();
+          s.setTcpNoDelay(true);
+          new Daemon(threadGroup, new DataXceiver(s)).start();
         }
         ss.close();
       } catch (IOException ie) {
@@ -880,6 +920,18 @@
         this.ss.close();
       } catch (IOException iex) {
       }
+
+      // close all the sockets that were accepted earlier
+      synchronized (childSockets) {
+        for (Iterator it = childSockets.values().iterator();
+             it.hasNext();) {
+          Socket thissock = (Socket) it.next();
+          try {
+            thissock.close();
+          } catch (IOException e) {
+          }
+        }
+      }
     }
   }
 
@@ -890,6 +942,7 @@
     Socket s;
     public DataXceiver(Socket s) {
       this.s = s;
+      childSockets.put(s, s);
       LOG.debug("Number of active connections is: "+xceiverCount);
     }
 
@@ -925,14 +978,15 @@
           copyBlock(in);
           break;
         default:
-          throw new IOException("Unknown opcode " + op + "in data stream");
+          throw new IOException("Unknown opcode " + op + " in data stream");
         }
-       } catch (Throwable t) {
+      } catch (Throwable t) {
         LOG.error(dnRegistration + ":DataXceiver: " + StringUtils.stringifyException(t));
       } finally {
         LOG.debug(dnRegistration + ":Number of active connections is: "+xceiverCount);
         IOUtils.closeStream(in);
         IOUtils.closeSocket(s);
+        childSockets.remove(s);
       }
     }
 
@@ -981,7 +1035,7 @@
         
         myMetrics.readBytes((int) read);
         myMetrics.readBlocks(1);
-        LOG.info(dnRegistration + "Served block " + block + " to " + s.getInetAddress());
+        LOG.info(dnRegistration + " Served block " + block + " to " + s.getInetAddress());
       } catch ( SocketException ignored ) {
         // Its ok for remote side to close the connection anytime.
         myMetrics.readBlocks(1);
@@ -1008,10 +1062,16 @@
      */
     private void writeBlock(DataInputStream in) throws IOException {
       xceiverCount.incr();
+      LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
+                " tcp no delay " + s.getTcpNoDelay());
       //
       // Read in the header
       //
       Block block = new Block(in.readLong(), 0);
+      LOG.info("Receiving block " + block + " from " + s.getInetAddress());
+      int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
+      boolean isRecovery = in.readBoolean(); // is this part of recovery?
+      String client = Text.readString(in); // working on behalf of this client
       int numTargets = in.readInt();
       if (numTargets < 0) {
         throw new IOException("Mislabelled incoming datastream.");
@@ -1025,13 +1085,19 @@
 
       short opStatus = OP_STATUS_SUCCESS; // write operation status
       DataOutputStream mirrorOut = null;  // stream to next target
+      DataInputStream mirrorIn = null;    // reply from next target
+      DataOutputStream replyOut = null;   // stream to prev target
       Socket mirrorSock = null;           // socket to next target
       BlockReceiver blockReceiver = null; // responsible for data handling
       String mirrorNode = null;           // the name:port of next target
+      String firstBadLink = "";           // first datanode that failed in connection setup
       try {
         // open a block receiver and check if the block does not exist
         blockReceiver = new BlockReceiver(block, in, 
-            s.getInetAddress().toString());
+            s.getInetAddress().toString(), isRecovery, client);
+
+        // get a connection back to the previous target
+        replyOut = new DataOutputStream(s.getOutputStream());
 
         //
         // Open network conn to backup machine, if 
@@ -1044,68 +1110,84 @@
           mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
           mirrorSock = new Socket();
           try {
-            mirrorSock.connect(mirrorTarget, READ_TIMEOUT);
-            mirrorSock.setSoTimeout(numTargets*READ_TIMEOUT);
-            mirrorOut = new DataOutputStream( 
-                        new BufferedOutputStream(mirrorSock.getOutputStream(),
-                                                 BUFFER_SIZE));
+            int timeoutValue = 3000 * numTargets + socketTimeout;
+            mirrorSock.connect(mirrorTarget, timeoutValue);
+            mirrorSock.setSoTimeout(timeoutValue);
+            mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
+            mirrorOut = new DataOutputStream(mirrorSock.getOutputStream());
+            mirrorIn = new DataInputStream(mirrorSock.getInputStream());
+
             // Write header: Copied from DFSClient.java!
             mirrorOut.writeShort( DATA_TRANFER_VERSION );
             mirrorOut.write( OP_WRITE_BLOCK );
             mirrorOut.writeLong( block.getBlockId() );
+            mirrorOut.writeInt( pipelineSize );
+            mirrorOut.writeBoolean( isRecovery );
+            Text.writeString( mirrorOut, client );
             mirrorOut.writeInt( targets.length - 1 );
             for ( int i = 1; i < targets.length; i++ ) {
               targets[i].write( mirrorOut );
             }
+
+            blockReceiver.writeChecksumHeader(mirrorOut);
+            mirrorOut.flush();
+
+            // read connect ack
+            firstBadLink = Text.readString(mirrorIn);
+            LOG.info("Datanode " + targets.length +
+                     " got response for connect ack " +
+                     " from downstream datanode with firstbadlink as " +
+                     firstBadLink);
+
           } catch (IOException e) {
+            Text.writeString(replyOut, mirrorNode);
+            replyOut.flush();
             IOUtils.closeStream(mirrorOut);
             mirrorOut = null;
+            IOUtils.closeStream(mirrorIn);
+            mirrorIn = null;
             IOUtils.closeSocket(mirrorSock);
             mirrorSock = null;
+            throw e;
           }
         }
 
+        // send connect ack back to source
+        LOG.info("Datanode " + targets.length +
+                 " forwarding connect ack to upstream firstbadlink is " +
+                 firstBadLink);
+        Text.writeString(replyOut, firstBadLink);
+        replyOut.flush();
+
         // receive the block and mirror to the next target
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+                                   mirrorAddr, null, targets.length);
 
-        blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null);
+        // if this write is for a replication request (and not
+        // from a client), then confirm block. For client-writes,
+        // the block is finalized in the PacketResponder.
+        if (client.length() == 0) {
+          notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
+          LOG.info("Received block " + block +
+                   " of size " + block.getNumBytes() +
+                   " from " + s.getInetAddress());
 
-        // notify name node
-        notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
+        }
 
         if (blockScanner != null) {
           blockScanner.addBlock(block);
         }
         
-        String msg = "Received block " + block + " from " +
-                     s.getInetAddress();
-
-        /* read response from next target in the pipeline. 
-         * ignore the response for now. Will fix it in HADOOP-1927
-         */
-        if( mirrorSock != null ) {
-          try {
-            receiveResponse(mirrorSock);
-          } catch (IOException ignored) {
-            msg += " and " +  ignored.getMessage();
-          } 
-        }
-
-        LOG.info(msg);
       } catch (IOException ioe) {
+        LOG.info("writeBlock " + block + " received exception " + ioe);
         opStatus = OP_STATUS_ERROR;
         throw ioe;
       } finally {
-        // send back reply
-        try {
-          sendResponse(s, opStatus);
-        } catch (IOException ioe) {
-          LOG.warn(dnRegistration +":Error writing reply of status " + opStatus +  " back to " + s.getInetAddress() +
-              " for writing block " + block +"\n" +
-              StringUtils.stringifyException(ioe));
-        }
         // close all opened streams
         IOUtils.closeStream(mirrorOut);
+        IOUtils.closeStream(mirrorIn);
+        IOUtils.closeStream(replyOut);
         IOUtils.closeSocket(mirrorSock);
         IOUtils.closeStream(blockReceiver);
         // decrement counter
@@ -1180,8 +1262,8 @@
         // get the output stream to the target
         InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
         targetSock = new Socket();
-        targetSock.connect(targetAddr, READ_TIMEOUT);
-        targetSock.setSoTimeout(READ_TIMEOUT);
+        targetSock.connect(targetAddr, socketTimeout);
+        targetSock.setSoTimeout(socketTimeout);
 
         targetOut = new DataOutputStream(new BufferedOutputStream(
             targetSock.getOutputStream(), BUFFER_SIZE));
@@ -1200,7 +1282,7 @@
         myMetrics.readBlocks(1);
         
         // check the response from target
-        receiveResponse(targetSock);
+        receiveResponse(targetSock, 1);
 
         LOG.info("Copied block " + block + " to " + targetAddr);
       } catch (IOException ioe) {
@@ -1243,11 +1325,11 @@
       try {
         // open a block receiver and check if the block does not exist
          blockReceiver = new BlockReceiver(
-            block, in, s.getRemoteSocketAddress().toString());
+            block, in, s.getRemoteSocketAddress().toString(), false, "");
 
         // receive a block
-        blockReceiver.receiveBlock(null, null, balancingThrottler);
-        
+        blockReceiver.receiveBlock(null, null, null, null, balancingThrottler, -1);
+                      
         // notify name node
         notifyNamenodeReceivedBlock(block, sourceID);
 
@@ -1373,6 +1455,7 @@
     private int checksumSize; // checksum size
     private boolean corruptChecksumOk; // if need to verify checksum
     private boolean chunkOffsetOK; // if need to send chunk offset
+    private long seqno; // sequence number of packet
 
     private boolean blockReadFully; //set when the whole block is read
     private boolean verifyChecksum; //if true, check is verified while reading
@@ -1447,6 +1530,8 @@
             IOUtils.skipFully(checksumIn, checksumSkip);
           }
         }
+        seqno = 0;
+
         InputStream blockInStream = data.getBlockInputStream(block, offset); // seek to offset
         blockIn = new DataInputStream(new BufferedInputStream(blockInStream, BUFFER_SIZE));
       } catch (IOException ioe) {
@@ -1482,12 +1567,14 @@
         throw ioe;
       }
     }
+
     
     private int sendChunk()
         throws IOException {
       int len = (int) Math.min(endOffset - offset, bytesPerChecksum);
-      if (len == 0)
+      if (len == 0) {
         return 0;
+      }
       blockIn.readFully(buf, 0, len);
 
       if (checksumSize > 0 && checksumIn != null) {
@@ -1515,7 +1602,17 @@
           }
         }
       }
+      boolean lastPacketInBlock = false;
+      if (offset + len >= endOffset) {
+        lastPacketInBlock = true;
+      }
 
+      // write packet header
+      out.writeInt(len + checksumSize + 4);
+      out.writeLong(offset);
+      out.writeLong(seqno);
+      out.writeBoolean(lastPacketInBlock);
+      
       out.writeInt(len);
       out.write(buf, 0, len + checksumSize);
 
@@ -1554,6 +1651,7 @@
           long len = sendChunk();
           offset += len;
           totalRead += len + checksumSize;
+          seqno++;
         }
         out.writeInt(0); // mark the end of block
         out.flush();
@@ -1571,12 +1669,277 @@
     }
   }
 
+  // This information is cached by the Datanode in the ackQueue
+  static private class Packet {
+    long seqno;
+    boolean lastPacketInBlock;
+
+    Packet(long seqno, boolean lastPacketInBlock) {
+      this.seqno = seqno;
+      this.lastPacketInBlock = lastPacketInBlock;
+    }
+  }
+
+  /**
+   * Processed responses from downstream datanodes in the pipeline
+   * and sends back replies to the originator.
+   */
+  class PacketResponder implements Runnable {
+    private LinkedList<Packet> ackQueue = new LinkedList<Packet>(); // packet waiting for ack
+    private volatile boolean running = true;
+    private Block block;
+    DataInputStream mirrorIn;   // input from downstream datanode
+    DataOutputStream replyOut;  // output to upstream datanode
+    private int numTargets;     // number of downstream datanodes including myself
+    private String clientName;  // The name of the client (if any)
+    private BlockReceiver receiver; // The owner of this responder.
+
+    public String toString() {
+      return "PacketResponder " + numTargets + " for Block " + this.block;
+    }
+
+    PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
+                    DataOutputStream out, int numTargets, String clientName) {
+      this.receiver = receiver;
+      this.block = b;
+      mirrorIn = in;
+      replyOut = out;
+      this.numTargets = numTargets;
+      this.clientName = clientName;
+    }
+
+    // enqueue the seqno that is still be to acked by the downstream datanode
+    synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
+      if (running) {
+        LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
+                  " to ack queue.");
+        ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
+        notifyAll();
+      }
+    }
+
+    // wait for all pending packets to be acked. Then shutdown thread.
+    synchronized void close() {
+      while (running && ackQueue.size() != 0 && shouldRun) {
+        try {
+          wait();
+        } catch (InterruptedException e) {
+          running = false;
+        }
+      }
+      LOG.debug("PacketResponder " + numTargets +
+               " for block " + block + " Closing down.");
+      running = false;
+      notifyAll();
+    }
+
+    private synchronized void lastDataNodeRun() {
+      long lastHeartbeat = System.currentTimeMillis();
+      boolean lastPacket = false;
+
+      while (running && shouldRun && !lastPacket) {
+        long now = System.currentTimeMillis();
+        try {
+
+            // wait for a packet to be sent to downstream datanode
+            while (running && shouldRun && ackQueue.size() == 0) {
+              long idle = now - lastHeartbeat;
+              long timeout = (socketTimeout/2) - idle;
+              if (timeout < 0) {
+                timeout = 1000;
+              }
+              try {
+                wait(timeout);
+              } catch (InterruptedException e) {
+                if (running) {
+                  LOG.info("PacketResponder " + numTargets +
+                           " for block " + block + " Interrupted.");
+                  running = false;
+                }
+                break;
+              }
+          
+              // send a heartbeat if it is time.
+              now = System.currentTimeMillis();
+              if (now - lastHeartbeat > socketTimeout/2) {
+                replyOut.writeLong(-1); // send heartbeat
+                replyOut.flush();
+                lastHeartbeat = now;
+              }
+            }
+
+            if (!running || !shouldRun) {
+              break;
+            }
+            Packet pkt = ackQueue.removeFirst();
+            long expected = pkt.seqno;
+            notifyAll();
+            LOG.debug("PacketResponder " + numTargets +
+                      " for block " + block + 
+                      " acking for packet " + expected);
+
+            // If this is the last packet in block, then close block
+            // file and finalize the block before responding success
+            if (pkt.lastPacketInBlock) {
+              receiver.close();
+              block.setNumBytes(receiver.offsetInBlock);
+              data.finalizeBlock(block);
+              myMetrics.wroteBlocks(1);
+              notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
+              LOG.info("Received block " + block + 
+                       " of size " + block.getNumBytes() + 
+                       " from " + receiver.inAddr);
+              lastPacket = true;
+            }
+
+            replyOut.writeLong(expected);
+            replyOut.writeShort(OP_STATUS_SUCCESS);
+            replyOut.flush();
+        } catch (Exception e) {
+          if (running) {
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
+            running = false;
+          }
+        }
+      }
+      LOG.info("PacketResponder " + numTargets + 
+               " for block " + block + " terminating");
+    }
+
+    // Thread to process incoming acks
+    public void run() {
+
+      // If this is the last datanode in pipeline, then handle differently
+      if (numTargets == 0) {
+        lastDataNodeRun();
+        return;
+      }
+
+      boolean lastPacketInBlock = false;
+      while (running && shouldRun && !lastPacketInBlock) {
+
+        try {
+            short op = OP_STATUS_SUCCESS;
+            boolean didRead = false;
+            long expected = -2;
+            try { 
+              // read seqno from downstream datanode
+              long seqno = mirrorIn.readLong();
+              didRead = true;
+              if (seqno == -1) {
+                replyOut.writeLong(-1); // send keepalive
+                replyOut.flush();
+                LOG.debug("PacketResponder " + numTargets + " got -1");
+                continue;
+              } else if (seqno == -2) {
+                LOG.debug("PacketResponder " + numTargets + " got -2");
+              } else {
+                LOG.debug("PacketResponder " + numTargets + " got seqno = " + seqno);
+                Packet pkt = null;
+                synchronized (this) {
+                  pkt = ackQueue.removeFirst();
+                  expected = pkt.seqno;
+                  notifyAll();
+                  LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
+                  if (seqno != expected) {
+                    throw new IOException("PacketResponder " + numTargets +
+                                          " for block " + block +
+                                          " expected seqno:" + expected +
+                                          " received:" + seqno);
+                  }
+                  lastPacketInBlock = pkt.lastPacketInBlock;
+                }
+              }
+            } catch (Throwable e) {
+              if (running) {
+                LOG.info("PacketResponder " + block + " " + numTargets + 
+                         " Exception " + StringUtils.stringifyException(e));
+                running = false;
+                if (!didRead) {
+                  op = OP_STATUS_ERROR;
+                }
+              }
+            }
+
+            // If this is the last packet in block, then close block
+            // file and finalize the block before responding success
+            if (lastPacketInBlock) {
+              receiver.close();
+              block.setNumBytes(receiver.offsetInBlock);
+              data.finalizeBlock(block);
+              myMetrics.wroteBlocks(1);
+              notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
+              LOG.info("Received block " + block + 
+                       " of size " + block.getNumBytes() + 
+                       " from " + receiver.inAddr);
+            }
+
+            // send my status back to upstream datanode
+            replyOut.writeLong(expected); // send seqno upstream
+            replyOut.writeShort(OP_STATUS_SUCCESS);
+
+            LOG.debug("PacketResponder " + numTargets + 
+                      " for block " + block +
+                      " responded my status " +
+                      " for seqno " + expected);
+
+            // forward responses from downstream datanodes.
+            for (int i = 0; i < numTargets && shouldRun; i++) {
+              try {
+                if (op == OP_STATUS_SUCCESS) {
+                  op = mirrorIn.readShort();
+                  if (op != OP_STATUS_SUCCESS) {
+                    LOG.debug("PacketResponder for block " + block +
+                              ": error code received from downstream " +
+                              " datanode[" + i + "] " + op);
+                  }
+                }
+              } catch (Throwable e) {
+                op = OP_STATUS_ERROR;
+              }
+              replyOut.writeShort(op);
+            }
+            replyOut.flush();
+            LOG.debug("PacketResponder " + block + " " + numTargets + 
+                      " responded other status " + " for seqno " + expected);
+
+            // If we were unable to read the seqno from downstream, then stop.
+            if (expected == -2) {
+              running = false;
+            }
+            // If we forwarded an error response from a downstream datanode
+            // and we are acting on behalf of a client, then we quit. The 
+            // client will drive the recovery mechanism.
+            if (op == OP_STATUS_ERROR && clientName.length() > 0) {
+              running = false;
+            }
+        } catch (IOException e) {
+          if (running) {
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
+            running = false;
+          }
+        } catch (RuntimeException e) {
+          if (running) {
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
+            running = false;
+          }
+        }
+      }
+      LOG.info("PacketResponder " + numTargets + 
+               " for block " + block + " terminating");
+    }
+  }
+
   /* A class that receives a block and wites to its own disk, meanwhile
    * may copies it to another site. If a throttler is provided,
    * streaming throttling is also supported. 
    * */
   private class BlockReceiver implements java.io.Closeable {
     private Block block; // the block to receive
+    private boolean finalized;
     private DataInputStream in = null; // from where data are read
     private DataChecksum checksum; // from where chunks of a block can be read
     private DataOutputStream out = null; // to block file at local disk
@@ -1584,15 +1947,22 @@
     private int bytesPerChecksum;
     private int checksumSize;
     private byte buf[];
-    private long offset;
+    private long offsetInBlock;
     final private String inAddr;
     private String mirrorAddr;
     private DataOutputStream mirrorOut;
+    private Daemon responder = null;
     private Throttler throttler;
     private int lastLen = -1;
     private int curLen = -1;
+    private FSDataset.BlockWriteStreams streams;
+    private boolean isRecovery = false;
+    private String clientName;
+    private Object currentWriteLock;
+    volatile private boolean currentWrite;
 
-    BlockReceiver(Block block, DataInputStream in, String inAddr)
+    BlockReceiver(Block block, DataInputStream in, String inAddr,
+                  boolean isRecovery, String clientName)
         throws IOException {
       try{
         this.block = block;
@@ -1602,15 +1972,20 @@
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
         this.buf = new byte[bytesPerChecksum + checksumSize];
-
+        this.isRecovery = isRecovery;
+        this.clientName = clientName;
+        this.offsetInBlock = 0;
+        this.currentWriteLock = new Object();
+        this.currentWrite = false;
         //
         // Open local disk out
         //
-        FSDataset.BlockWriteStreams streams = data.writeToBlock(block);
-        this.out = new DataOutputStream(new BufferedOutputStream(
-          streams.dataOut, BUFFER_SIZE));
-        this.checksumOut = new DataOutputStream(new BufferedOutputStream(
-          streams.checksumOut, BUFFER_SIZE));
+        streams = data.writeToBlock(block, isRecovery);
+		this.finalized = data.isValidBlock(block);
+        if (streams != null) {
+          this.out = new DataOutputStream(streams.dataOut);
+          this.checksumOut = new DataOutputStream(streams.checksumOut);
+        }
       } catch(IOException ioe) {
         IOUtils.closeStream(this);
         throw ioe;
@@ -1619,6 +1994,17 @@
 
     // close files
     public void close() throws IOException {
+
+      try {
+        while (currentWrite) {
+          LOG.info("BlockReceiver for block " + block +
+                   " waiting for last write to drain.");
+          synchronized (currentWriteLock) {
+              currentWriteLock.wait();
+          }
+        }
+      } catch (InterruptedException e) {
+      }
       IOException ioe = null;
       // close checksum file
       try {
@@ -1644,12 +2030,12 @@
         throw ioe;
       }
     }
-    
+
     /* receive a chunk: write it to disk & mirror it to another stream */
     private void receiveChunk( int len ) throws IOException {
       if (len <= 0 || len > bytesPerChecksum) {
         throw new IOException("Got wrong length during writeBlock(" + block
-            + ") from " + inAddr + " at offset " + offset + ": " + len
+            + ") from " + inAddr + " at offset " + offsetInBlock + ": " + len
             + " expected <= " + bytesPerChecksum);
       }
 
@@ -1678,31 +2064,58 @@
 
       checksum.reset();
 
+      // record the fact that the current write is still in progress
+      synchronized (currentWriteLock) {
+        currentWrite = true;
+      }
+      offsetInBlock += len;
+
       // First write to remote node before writing locally.
       if (mirrorOut != null) {
         try {
           mirrorOut.writeInt(len);
           mirrorOut.write(buf, 0, len + checksumSize);
         } catch (IOException ioe) {
-          LOG.info(dnRegistration + ":Exception writing to mirror " + mirrorAddr + "\n"
-              + StringUtils.stringifyException(ioe));
+          LOG.info(dnRegistration + ":Exception writing block " +
+                   block + " to mirror " + mirrorAddr + "\n" +
+                   StringUtils.stringifyException(ioe));
+          mirrorOut = null;
+          offsetInBlock -= len;
+          synchronized (currentWriteLock) {
+            currentWrite = false;
+            currentWriteLock.notifyAll();
+          }
           //
           // If stream-copy fails, continue
-          // writing to disk. We shouldn't
-          // interrupt client write.
+          // writing to disk for replication requests. For client
+          // writes, return error so that the client can do error
+          // recovery.
           //
-          mirrorOut = null;
+          if (clientName.length() > 0) {
+            synchronized (currentWriteLock) {
+              currentWrite = false;
+              currentWriteLock.notifyAll();
+            }
+            throw ioe;
+          }
         }
       }
 
       try {
-        out.write(buf, 0, len);
-        // Write checksum
-        checksumOut.write(buf, len, checksumSize);
-        myMetrics.wroteBytes(len);
+        if (!finalized) {
+          out.write(buf, 0, len);
+          // Write checksum
+          checksumOut.write(buf, len, checksumSize);
+          myMetrics.wroteBytes(len);
+        }
       } catch (IOException iex) {
         checkDiskError(iex);
         throw iex;
+      } finally {
+        synchronized (currentWriteLock) {
+          currentWrite = false;
+          currentWriteLock.notifyAll();
+        }
       }
 
       if (throttler != null) { // throttle I/O
@@ -1710,8 +2123,100 @@
       }
     }
 
-    public void receiveBlock(DataOutputStream mirrorOut,
-        String mirrorAddr, Throttler throttler) throws IOException {
+    /* 
+     * Receive and process a packet. It contains many chunks.
+     */
+    private void receivePacket(int packetSize) throws IOException {
+
+      offsetInBlock = in.readLong(); // get offset of packet in block
+      long seqno = in.readLong();    // get seqno
+      boolean lastPacketInBlock = in.readBoolean();
+      int curPacketSize = 0;         
+      LOG.debug("Receiving one packet for block " + block +
+                " of size " + packetSize +
+                " seqno " + seqno +
+                " offsetInBlock " + offsetInBlock +
+                " lastPacketInBlock " + lastPacketInBlock);
+      setBlockPosition(offsetInBlock);
+
+      int len = in.readInt();
+      curPacketSize += 4;            // read an integer in previous line
+
+      // send packet header to next datanode in pipeline
+      if (mirrorOut != null) {
+        try {
+          mirrorOut.writeInt(packetSize);
+          mirrorOut.writeLong(offsetInBlock);
+          mirrorOut.writeLong(seqno);
+          mirrorOut.writeBoolean(lastPacketInBlock);
+        } catch (IOException e) {
+          LOG.info("Exception writing to mirror " + mirrorAddr + "\n"
+              + StringUtils.stringifyException(e));
+          mirrorOut = null;
+
+          // If stream-copy fails, continue
+          // writing to disk for replication requests. For client
+          // writes, return error so that the client can do error
+          // recovery.
+          //
+          if (clientName.length() > 0) {
+            throw e;
+          }
+        }
+        // first enqueue the ack packet to avoid a race with the response coming
+        // from downstream datanode.
+        if (responder != null) {
+          ((PacketResponder)responder.getRunnable()).enqueue(seqno, 
+                                          lastPacketInBlock); 
+        }
+      }
+
+      if (len == 0) {
+        LOG.info("Receiving empty packet for block " + block);
+        if (mirrorOut != null) {
+          mirrorOut.writeInt(len);
+          mirrorOut.flush();
+        }
+      }
+
+      while (len != 0) {
+        LOG.debug("Receiving one chunk for block " + block +
+                  " of size " + len);
+        receiveChunk( len );
+        curPacketSize += (len + checksumSize);
+        if (curPacketSize > packetSize) {
+          throw new IOException("Packet size for block " + block +
+                                " too long " + curPacketSize +
+                                " was expecting " + packetSize);
+        } 
+        if (curPacketSize == packetSize) {
+          if (mirrorOut != null) {
+            mirrorOut.flush();
+          }
+          break;
+        }
+        len = in.readInt();
+        curPacketSize += 4;
+      }
+
+      // put in queue for pending acks
+      if (responder != null && mirrorOut == null) {
+        ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+                                        lastPacketInBlock); 
+      }
+    }
+
+    public void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
+      checksum.writeHeader(mirrorOut);
+    }
+   
+
+    public void receiveBlock(
+        DataOutputStream mirrorOut, // output to next datanode
+        DataInputStream mirrorIn,   // input from next datanode
+        DataOutputStream replyOut,  // output to previous datanode
+        String mirrorAddr, Throttler throttler,
+        int numTargets) throws IOException {
 
         this.mirrorOut = mirrorOut;
         this.mirrorAddr = mirrorAddr;
@@ -1728,36 +2233,110 @@
         // write data chunk header
         checksumOut.writeShort(FSDataset.METADATA_VERSION);
         checksum.writeHeader(checksumOut);
-        if (mirrorOut != null) {
-          checksum.writeHeader(mirrorOut);
-          this.mirrorAddr = mirrorAddr;
+        if (clientName.length() > 0) {
+          responder = new Daemon(threadGroup, 
+                                 new PacketResponder(this, block, mirrorIn, 
+                                                     replyOut, numTargets,
+                                                     clientName));
+          responder.start(); // start thread to processes reponses
         }
 
-        int len = in.readInt();
+        /* 
+         * Skim packet headers. A response is needed for every packet.
+         */
+        int len = in.readInt(); // get packet size
         while (len != 0) {
-          receiveChunk( len );
-          offset += len;
-          len = in.readInt();
+          receivePacket(len);
+          len = in.readInt(); // get packet size
         }
 
         // flush the mirror out
         if (mirrorOut != null) {
-          mirrorOut.writeInt(0); // mark the end of the stream
+          mirrorOut.writeInt(0); // mark the end of the block
           mirrorOut.flush();
         }
 
-        // close the block/crc files
-        close();
+        // wait for all outstanding packet responses. And then
+        // indicate responder to gracefully shutdown.
+        if (responder != null) {
+          ((PacketResponder)responder.getRunnable()).close();
+        }
+
+        // if this write is for a replication request (and not
+        // from a client), then finalize block. For client-writes, 
+        // the block is finalized in the PacketResponder.
+        if (clientName.length() == 0) {
+          // close the block/crc files
+          close();
+
+          // Finalize the block. Does this fsync()?
+          block.setNumBytes(offsetInBlock);
+          data.finalizeBlock(block);
+          myMetrics.wroteBlocks(1);
+        }
 
-        // Finalize the block. Does this fsync()?
-        block.setNumBytes(offset);
-        data.finalizeBlock(block);
-        myMetrics.wroteBlocks(1);
       } catch (IOException ioe) {
+        LOG.info("Exception in receiveBlock for block " + block + 
+                 " " + ioe);
         IOUtils.closeStream(this);
+        if (responder != null) {
+          responder.interrupt();
+        }
         throw ioe;
+      } finally {
+        if (responder != null) {
+          try {
+            responder.join();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted receiveBlock");
+          }
+          responder = null;
+        }
       }
     }
+
+    /**
+     * Sets the file pointer in the local block file to the specified value.
+     */
+    private void setBlockPosition(long offsetInBlock) throws IOException {
+      if (finalized) {
+        if (!isRecovery) {
+          throw new IOException("Write to offset " + offsetInBlock +
+                                " of block " + block +
+                                " that is already finalized.");
+        }
+        if (offsetInBlock > data.getLength(block)) {
+          throw new IOException("Write to offset " + offsetInBlock +
+                                " of block " + block +
+                                " that is already finalized and is of size " +
+                                data.getLength(block));
+        }
+      }
+      if (data.getChannelPosition(block, streams) == offsetInBlock) {
+        return;                   // nothing to do 
+      }
+      if (offsetInBlock % bytesPerChecksum != 0) {
+        throw new IOException("setBlockPosition trying to set position to " +
+                              offsetInBlock +
+                              " which is not a multiple of bytesPerChecksum " +
+                               bytesPerChecksum);
+      }
+      long offsetInChecksum = checksum.getChecksumHeaderSize() + 
+                              offsetInBlock / bytesPerChecksum * checksumSize;
+      if (out != null) {
+       out.flush();
+      }
+      if (checksumOut != null) {
+        checksumOut.flush();
+      }
+      LOG.info("Changing block file offset from " + 
+               data.getChannelPosition(block, streams) +
+               " to " + offsetInBlock +
+               " meta file offset to " + offsetInChecksum);
+
+      // set the position of the block file
+      data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
+    }
   }
 
   /**
@@ -1790,8 +2369,8 @@
         InetSocketAddress curTarget = 
           NetUtils.createSocketAddr(targets[0].getName());
         sock = new Socket();  
-        sock.connect(curTarget, READ_TIMEOUT);
-        sock.setSoTimeout(targets.length*READ_TIMEOUT);
+        sock.connect(curTarget, socketTimeout);
+        sock.setSoTimeout(targets.length * socketTimeout);
 
         out = new DataOutputStream(new BufferedOutputStream(
             sock.getOutputStream(), BUFFER_SIZE));
@@ -1803,6 +2382,9 @@
         out.writeShort(DATA_TRANFER_VERSION);
         out.writeByte(OP_WRITE_BLOCK);
         out.writeLong(b.getBlockId());
+        out.writeInt(0);           // no pipelining
+        out.writeBoolean(false);   // not part of recovery
+        Text.writeString(out, ""); // client
         // write targets
         out.writeInt(targets.length - 1);
         for (int i = 1; i < targets.length; i++) {
@@ -1811,9 +2393,7 @@
         // send data & checksum
         blockSender.sendBlock(out, null);
 
-        
-        // check the response
-        receiveResponse(sock);
+        // no response necessary
         LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);
 
       } catch (IOException ie) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Thu Jan 17 10:11:35 2008
@@ -114,7 +114,7 @@
    *   OP_REPLACE_BLOCK BlockID(long) SourceID(UTF8) Block_Data_With_Crc
    *   return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise
    */
-  public static final int DATA_TRANFER_VERSION = 7;
+  public static final int DATA_TRANFER_VERSION = 8;
 
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;
@@ -142,6 +142,9 @@
   public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
   //TODO mb@media-style.com: should be conf injected?
   public static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
+  public static final int DEFAULT_DATA_SOCKET_SIZE = 1024 * 1024;
+
+  public static final int SIZE_OF_INTEGER = Integer.SIZE / Byte.SIZE;
 
   // SafeMode actions
   public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }



Mime
View raw message