hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r556743 [2/3] - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/mapred/
Date Mon, 16 Jul 2007 21:35:03 GMT
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Jul 16 14:34:59 2007
@@ -31,7 +31,9 @@
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.zip.CRC32;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -48,7 +50,6 @@
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.DFSClient");
   static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
-  private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
   ClientProtocol namenode;
   boolean running = true;
   Random r = new Random();
@@ -261,18 +262,15 @@
     return hints;
   }
 
+  public DFSInputStream open(UTF8 src) throws IOException {
+    return open(src, conf.getInt("io.file.buffer.size", 4096));
+  }
   /**
    * Create an input stream that obtains a nodelist from the
    * namenode, and then reads from all the right places.  Creates
    * inner subclass of InputStream that does the right out-of-band
    * work.
    */
-  public DFSInputStream open(UTF8 src) throws IOException {
-    checkOpen();
-    //    Get block info from namenode
-    return new DFSInputStream(src.toString());
-  }
-
   public DFSInputStream open(UTF8 src, int buffersize) throws IOException {
     checkOpen();
     //    Get block info from namenode
@@ -546,10 +544,12 @@
    * Pick the best node from which to stream the data.
    * Entries in <i>nodes</i> are already in the priority order
    */
-  private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
+  private DatanodeInfo bestNode(DatanodeInfo nodes[], 
+                                AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes)
+                                throws IOException {
     if (nodes != null) { 
       for (int i = 0; i < nodes.length; i++) {
-        if (!deadNodes.contains(nodes[i])) {
+        if (!deadNodes.containsKey(nodes[i])) {
           return nodes[i];
         }
       }
@@ -596,6 +596,229 @@
     }
   }
 
+  /** This is a wrapper around connection to datadone
+   * and understands checksum, offset etc
+   */
+  static class BlockReader extends FSInputChecker {
+
+    private DataInputStream in;
+    private DataChecksum checksum;
+    private long lastChunkOffset = -1;
+    private long lastChunkLen = -1;
+
+    private long startOffset;
+    private long firstChunkOffset;
+    private int bytesPerChecksum;
+    private int checksumSize;
+    private boolean gotEOS = false;
+    
+    byte[] skipBuf = null;
+    
+    /* FSInputChecker interface */
+    
+    /* same interface as inputStream java.io.InputStream#read()
+     * used by DFSInputStream#read()
+     * This violates one rule when there is a checksum error:
+     * "Read should not modify user buffer before successful read"
+     * because it first reads the data to user buffer and then checks
+     * the checksum.
+     */
+    public synchronized int read(byte[] buf, int off, int len) 
+                                 throws IOException {
+      
+      //for the first read, skip the extra bytes at the front.
+      if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+        // Skip these bytes. But don't call this.skip()!
+        int toSkip = (int)(startOffset - firstChunkOffset);
+        if ( skipBuf == null ) {
+          skipBuf = new byte[bytesPerChecksum];
+        }
+        if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
+          // should never happen
+          throw new IOException("Could not skip required number of bytes");
+        }
+      }
+      
+      return super.read(buf, off, len);
+    }
+
+    public synchronized long skip(long n) throws IOException {
+      /* How can we make sure we don't throw a ChecksumException, at least
+       * in majority of the cases?. This one throws. */  
+      if ( skipBuf == null ) {
+        skipBuf = new byte[bytesPerChecksum]; 
+      }
+
+      long nSkipped = 0;
+      while ( nSkipped < n ) {
+        int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+        int ret = read(skipBuf, 0, toSkip);
+        if ( ret <= 0 ) {
+          return nSkipped;
+        }
+        nSkipped += ret;
+      }
+      return nSkipped;
+    }
+
+    public int read() throws IOException {
+      throw new IOException("read() is not expected to be invoked. " +
+                            "Use read(buf, off, len) instead.");
+    }
+    
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      /* Checksum errors are handled outside the BlockReader. 
+       * DFSInputStream does not always call 'seekToNewSource'. In the 
+       * case of pread(), it just tries a different replica without seeking.
+       */ 
+      return false;
+    }
+    
+    public void seek(long pos) throws IOException {
+      throw new IOException("Seek() is not supported in BlockInputChecker");
+    }
+
+    protected long getChunkPosition(long pos) {
+      throw new RuntimeException("getChunkPosition() is not supported, " +
+                                 "since seek is not required");
+    }
+    
+    protected synchronized int readChunk(long pos, byte[] buf, int offset, 
+                                         int len, byte[] checksumBuf) 
+                                         throws IOException {
+      // Read one chunk.
+      
+      if ( gotEOS ) {
+        if ( startOffset < 0 ) {
+          //This is mainly for debugging. can be removed.
+          throw new IOException( "BlockRead: already got EOS or an error" );
+        }
+        startOffset = -1;
+        return -1;
+      }
+      
+      // Read one DATA_CHUNK.
+      long chunkOffset = lastChunkOffset;
+      if ( lastChunkLen > 0 ) {
+        chunkOffset += lastChunkLen;
+      }
+      
+      if ( (pos + firstChunkOffset) != chunkOffset ) {
+        throw new IOException("Mismatch in pos : " + pos + " + " + 
+                              firstChunkOffset + " != " + chunkOffset);
+      }
+      
+      int chunkLen = (int) in.readInt();
+      
+      // Sanity check the lengths
+      if ( chunkLen < 0 || chunkLen > bytesPerChecksum ||
+          ( lastChunkLen >= 0 && // prev packet exists
+              ( (chunkLen > 0 && lastChunkLen != bytesPerChecksum) ||
+                  chunkOffset != (lastChunkOffset + lastChunkLen) ) ) ) {
+        throw new IOException("BlockReader: error in chunk's offset " +
+                              "or length (" + chunkOffset + ":" +
+                              chunkLen + ")");
+      }
+
+      if ( chunkLen > 0 ) {
+        // len should be >= chunkLen
+        FileUtil.readFully(in, buf, offset, chunkLen);
+      }
+      
+      if ( checksumSize > 0 ) {
+        FileUtil.readFully(in, checksumBuf, 0, checksumSize);
+      }
+
+      lastChunkOffset = chunkOffset;
+      lastChunkLen = chunkLen;
+      
+      if ( chunkLen == 0 ) {
+        gotEOS = true;
+        return -1;
+      }
+      
+      return chunkLen;
+    }
+    
+    private BlockReader( String file, long blockId, DataInputStream in, 
+                         DataChecksum checksum, long startOffset,
+                         long firstChunkOffset ) {
+      super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
+            1, (checksum.getChecksumSize() > 0) ? checksum : null, 
+            checksum.getBytesPerChecksum(),
+            checksum.getChecksumSize());
+      
+      this.in = in;
+      this.checksum = checksum;
+      this.startOffset = Math.max( startOffset, 0 );
+
+      this.firstChunkOffset = firstChunkOffset;
+      lastChunkOffset = firstChunkOffset;
+      lastChunkLen = -1;
+
+      bytesPerChecksum = this.checksum.getBytesPerChecksum();
+      checksumSize = this.checksum.getChecksumSize();
+    }
+
+    /** Java Doc required */
+    static BlockReader newBlockReader( Socket sock, String file, long blockId, 
+                                       long startOffset, long len,
+                                       int bufferSize)
+                                       throws IOException {
+      
+      // in and out will be closed when sock is closed (by the caller)
+      DataOutputStream out = new DataOutputStream(
+                       new BufferedOutputStream(sock.getOutputStream()));
+
+      //write the header.
+      out.writeShort( DATA_TRANFER_VERSION );
+      out.write( OP_READ_BLOCK );
+      out.writeLong( blockId );
+      out.writeLong( startOffset );
+      out.writeLong( len );
+      out.flush();
+
+      //
+      // Get bytes in block, set streams
+      //
+
+      DataInputStream in = new DataInputStream(
+                   new BufferedInputStream(sock.getInputStream(), bufferSize));
+      
+      if ( in.readShort() != OP_STATUS_SUCCESS ) {
+        throw new IOException("Got error in response to OP_READ_BLOCK");
+      }
+      DataChecksum checksum = DataChecksum.newDataChecksum( in );
+      //Warning when we get CHECKSUM_NULL?
+      
+      // Read the first chunk offset.
+      long firstChunkOffset = in.readLong();
+      
+      if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+          firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
+        throw new IOException("BlockReader: error in first chunk offset (" +
+                              firstChunkOffset + ") startOffset is " + 
+                              startOffset + "for file XXX");
+      }
+
+      return new BlockReader( file, blockId, in, checksum,
+                              startOffset, firstChunkOffset );
+    }
+
+    public synchronized void close() throws IOException {
+      startOffset = -1;
+      checksum = null;
+      // in will be closed when its Socket is closed.
+    }
+    
+    /** kind of like readFully(). Only reads as much as possible.
+     * And allows use of protected readFully().
+     */
+    int readAll(byte[] buf, int offset, int len) throws IOException {
+      return readFully(this, buf, offset, len);
+    }
+  }
+    
   /****************************************************************
    * DFSInputStream provides bytes from a named file.  It handles 
    * negotiation of the namenode and various datanodes as necessary.
@@ -606,27 +829,32 @@
 
     private String src;
     private long prefetchSize = 10 * defaultBlockSize;
-    private DataInputStream blockStream;
+    private BlockReader blockReader;
     private LocatedBlocks locatedBlocks = null;
     private DatanodeInfo currentNode = null;
     private Block currentBlock = null;
     private long pos = 0;
     private long blockEnd = -1;
-    private TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
-    private int buffersize;
-        
-    /**
-     */
-    public DFSInputStream(String src) throws IOException {
-      this(src, conf.getInt("io.file.buffer.size", 4096));
+    /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
+     * parallel accesses to DFSInputStream (through ptreads) properly */
+    private ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = 
+               new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
+    private int buffersize = 1;
+    
+    private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
+    
+    void addToDeadNodes(DatanodeInfo dnInfo) {
+      deadNodes.put(dnInfo, dnInfo);
     }
     
+    /**
+     */
     public DFSInputStream(String src, int buffersize) throws IOException {
       this.buffersize = buffersize;
       this.src = src;
       prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
       openInfo();
-      this.blockStream = null;
+      blockReader = null;
     }
 
     /**
@@ -648,7 +876,7 @@
       this.currentNode = null;
     }
     
-    public long getFileLength() {
+    public synchronized long getFileLength() {
       return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength();
     }
 
@@ -710,7 +938,8 @@
      * @return consequent segment of located blocks
      * @throws IOException
      */
-    private List<LocatedBlock> getBlockRange(long offset, long length) 
+    private synchronized List<LocatedBlock> getBlockRange(long offset, 
+                                                          long length) 
                                                         throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
       List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
@@ -750,6 +979,11 @@
         throw new IOException("Attempted to read past end of file");
       }
 
+      if ( blockReader != null ) {
+        blockReader.close(); 
+        blockReader = null;
+      }
+      
       if (s != null) {
         s.close();
         s = null;
@@ -775,37 +1009,19 @@
           s = new Socket();
           s.connect(targetAddr, READ_TIMEOUT);
           s.setSoTimeout(READ_TIMEOUT);
-
-          //
-          // Xmit header info to datanode
-          //
-          Block block = targetBlock.getBlock();
-          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-          out.write(OP_READSKIP_BLOCK);
-          block.write(out);
-          out.writeLong(offsetIntoBlock);
-          out.flush();
-
-          //
-          // Get bytes in block, set streams
-          //
-          DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream(), buffersize));
-          long curBlockSize = in.readLong();
-          long amtSkipped = in.readLong();
-          if (curBlockSize != block.getNumBytes()) {
-            throw new IOException("Recorded block size is " + block.getNumBytes() + ", but datanode reports size of " + curBlockSize);
-          }
-          if (amtSkipped != offsetIntoBlock) {
-            throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
-          }
-
-          this.blockStream = in;
+          Block blk = targetBlock.getBlock();
+          
+          blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
+                                                   offsetIntoBlock,
+                                                   (blk.getNumBytes() - 
+                                                    offsetIntoBlock),
+                                                   buffersize);
           return chosenNode;
         } catch (IOException ex) {
           // Put chosen node into dead list, continue
           LOG.debug("Failed to connect to " + targetAddr + ":" 
                     + StringUtils.stringifyException(ex));
-          deadNodes.add(chosenNode);
+          addToDeadNodes(chosenNode);
           if (s != null) {
             try {
               s.close();
@@ -827,8 +1043,12 @@
         throw new IOException("Stream closed");
       }
 
+      if ( blockReader != null ) {
+        blockReader.close();
+        blockReader = null;
+      }
+      
       if (s != null) {
-        blockStream.close();
         s.close();
         s = null;
       }
@@ -836,25 +1056,39 @@
       closed = true;
     }
 
-    /**
-     * Basic read()
-     */
     public synchronized int read() throws IOException {
-      checkOpen();
-      if (closed) {
-        throw new IOException("Stream closed");
-      }
-      int result = -1;
-      if (pos < getFileLength()) {
-        if (pos > blockEnd) {
-          currentNode = blockSeekTo(pos);
-        }
-        result = blockStream.read();
-        if (result >= 0) {
-          pos++;
+      int ret = read( oneByteBuf, 0, 1 );
+      return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
+    }
+
+    /* This is a used by regular read() and handles ChecksumExceptions.
+     * name readBuffer() is chosen to imply similarity to readBuffer() in
+     * ChecksuFileSystem
+     */ 
+    private synchronized int readBuffer(byte buf[], int off, int len) 
+                                                    throws IOException {
+      IOException ioe;
+ 
+      while (true) {
+        // retry as many times as seekToNewSource allows.
+        try {
+          return blockReader.read(buf, off, len);
+        } catch ( ChecksumException ce ) {
+          LOG.warn("Found Checksum error for " + currentBlock + " from " +
+                   currentNode.getName() + " at " + ce.getPos());          
+          reportChecksumFailure(src, currentBlock, currentNode);
+          ioe = ce;
+        } catch ( IOException e ) {
+          LOG.warn("Exception while reading from " + currentBlock +
+                   " of " + src + " from " + currentNode + ": " +
+                   StringUtils.stringifyException(e));
+          ioe = e;
+        }
+        addToDeadNodes(currentNode);
+        if (!seekToNewSource(pos)) {
+            throw ioe;
         }
       }
-      return result;
     }
 
     /**
@@ -873,17 +1107,23 @@
               currentNode = blockSeekTo(pos);
             }
             int realLen = Math.min(len, (int) (blockEnd - pos + 1));
-            int result = blockStream.read(buf, off, realLen);
+            int result = readBuffer(buf, off, realLen);
+            
             if (result >= 0) {
               pos += result;
+            } else {
+              // got a EOS from reader though we expect more data on it.
+              throw new IOException("Unexpected EOS from the reader");
             }
             return result;
+          } catch (ChecksumException ce) {
+            throw ce;            
           } catch (IOException e) {
             if (retries == 1) {
               LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
             }
             blockEnd = -1;
-            if (currentNode != null) { deadNodes.add(currentNode); }
+            if (currentNode != null) { addToDeadNodes(currentNode); }
             if (--retries == 0) {
               throw e;
             }
@@ -931,7 +1171,10 @@
       // Connect to best DataNode for desired Block, with potential offset
       //
       Socket dn = null;
-      while (dn == null) {
+      int numAttempts = block.getLocations().length;
+      IOException ioe = null;
+      
+      while (dn == null && numAttempts-- > 0 ) {
         DNAddrPair retval = chooseDataNode(block);
         DatanodeInfo chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
@@ -941,52 +1184,39 @@
           dn.connect(targetAddr, READ_TIMEOUT);
           dn.setSoTimeout(READ_TIMEOUT);
               
-          //
-          // Xmit header info to datanode
-          //
-          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(dn.getOutputStream()));
-          out.write(OP_READ_RANGE_BLOCK);
-          block.getBlock().write(out);
-          out.writeLong(start);
-          out.writeLong(end);
-          out.flush();
+          int len = (int) (end - start + 1);
               
-          //
-          // Get bytes in block, set streams
-          //
-          DataInputStream in = new DataInputStream(new BufferedInputStream(dn.getInputStream()));
-          long curBlockSize = in.readLong();
-          long actualStart = in.readLong();
-          long actualEnd = in.readLong();
-          if (curBlockSize != block.getBlockSize()) {
-            throw new IOException("Recorded block size is " +
-                                  block.getBlockSize() + 
-                                  ", but datanode reports size of " +
-                                  curBlockSize);
-          }
-          if ((actualStart != start) || (actualEnd != end)) {
-            throw new IOException("Asked for byte range  " + start +
-                                  "-" + end + ", but only received range " + actualStart +
-                                  "-" + actualEnd);
-          }
-          int nread = in.read(buf, offset, (int)(end - start + 1));
-          assert nread == (int)(end - start + 1) : 
-            "Incorrect number of bytes read " + nread
-            + ". Expacted " + (int)(end - start + 1);
-        } catch (IOException ex) {
-          // Put chosen node into dead list, continue
-          LOG.debug("Failed to connect to " + targetAddr + ":" 
-                    + StringUtils.stringifyException(ex));
-          deadNodes.add(chosenNode);
-          if (dn != null) {
-            try {
-              dn.close();
-            } catch (IOException iex) {
-            }
+          BlockReader reader = 
+            BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(),
+                                       start, len, buffersize);
+          int nread = reader.readAll(buf, offset, len);
+          if (nread != len) {
+            throw new IOException("truncated return from reader.read(): " +
+                                  "excpected " + len + ", got " + nread);
+          }
+          return;
+        } catch (ChecksumException e) {
+          ioe = e;
+          LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
+                   src + " at " + block.getBlock() + ":" + 
+                   e.getPos() + " from " + chosenNode.getName());
+          reportChecksumFailure(src, block.getBlock(), chosenNode);
+        } catch (IOException e) {
+          ioe = e;
+          LOG.warn("Failed to connect to " + targetAddr + ":" 
+                    + StringUtils.stringifyException(e));
+        } 
+        // Put chosen node into dead list, continue
+        addToDeadNodes(chosenNode);
+        if (dn != null) {
+          try {
+            dn.close();
+          } catch (IOException iex) {
           }
           dn = null;
         }
       }
+      throw (ioe == null) ? new IOException("Could not read data") : ioe;
     }
 
     /**
@@ -1031,7 +1261,15 @@
       assert remaining == 0 : "Wrong number of bytes read.";
       return realLen;
     }
-        
+     
+    public long skip(long n) throws IOException {
+      if ( n > 0 ) {
+        seek(getPos()+n);
+        return n;
+      }
+      return n < 0 ? -1 : 0;
+    }
+
     /**
      * Seek to a new arbitrary location
      */
@@ -1048,8 +1286,7 @@
         //
         int diff = (int)(targetPos - pos);
         if (diff <= TCP_WINDOW_SIZE) {
-          int adiff = blockStream.skipBytes(diff);
-          pos += adiff;
+          pos += blockReader.skip(diff);
           if (pos == targetPos) {
             done = true;
           }
@@ -1067,8 +1304,8 @@
      * If another node could not be found, then returns false.
      */
     public synchronized boolean seekToNewSource(long targetPos) throws IOException {
-      boolean markedDead = deadNodes.contains(currentNode);
-      deadNodes.add(currentNode);
+      boolean markedDead = deadNodes.containsKey(currentNode);
+      addToDeadNodes(currentNode);
       DatanodeInfo oldNode = currentNode;
       DatanodeInfo newNode = blockSeekTo(targetPos);
       if (!markedDead) {
@@ -1144,13 +1381,10 @@
   /****************************************************************
    * DFSOutputStream creates files from a stream of bytes.
    ****************************************************************/
-  class DFSOutputStream extends OutputStream {
+  class DFSOutputStream extends FSOutputSummer {
     private Socket s;
     boolean closed = false;
 
-    private byte outBuf[] = new byte[BUFFER_SIZE];
-    private int pos = 0;
-
     private UTF8 src;
     private boolean overwrite;
     private short replication;
@@ -1161,10 +1395,10 @@
     private OutputStream backupStream;
     private Block block;
     private long filePos = 0;
-    private int bytesWrittenToBlock = 0;
-    private String datanodeName;
+    private long bytesWrittenToBlock = 0;
     private long blockSize;
     private int buffersize;
+    private DataChecksum checksum;
 
     private Progressable progress;
     /**
@@ -1175,19 +1409,37 @@
                            Progressable progress,
                            int buffersize
                            ) throws IOException {
+      super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4);
       this.src = src;
       this.overwrite = overwrite;
       this.replication = replication;
-      this.backupFile = newBackupFile();
       this.blockSize = blockSize;
-      this.backupStream = new FileOutputStream(backupFile);
+      this.buffersize = buffersize;
       this.progress = progress;
       if (progress != null) {
         LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
       }
-      this.buffersize = buffersize;
+      
+      int bytesPerChecksum = conf.getInt( "io.bytes.per.checksum", 512); 
+      if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
+        throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
+                              ") and blockSize(" + blockSize + 
+                              ") do not match. " + "blockSize should be a " +
+                              "multiple of io.bytes.per.checksum");
+                              
+      }
+      
+      checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
+                                              bytesPerChecksum);
     }
 
+    private void openBackupStream() throws IOException {
+      File tmpFile = newBackupFile();
+      backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile),
+                                              buffersize);
+      backupFile = tmpFile;
+    }
+    
     /* Wrapper for closing backupStream. This sets backupStream to null so
      * that we do not attempt to write to backupStream that could be
      * invalid in subsequent writes. Otherwise we might end trying to write
@@ -1195,6 +1447,7 @@
      */
     private void closeBackupStream() throws IOException {
       if (backupStream != null) {
+        backupStream.flush();
         OutputStream stream = backupStream;
         backupStream = null;
         stream.close();
@@ -1252,7 +1505,6 @@
           s = new Socket();
           s.connect(target, READ_TIMEOUT);
           s.setSoTimeout(replication * READ_TIMEOUT);
-          datanodeName = nodes[0].getName();
         } catch (IOException ie) {
           // Connection failed.  Let's wait a little bit and retry
           try {
@@ -1276,16 +1528,16 @@
         // Xmit header info to datanode
         //
         DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize));
-        out.write(OP_WRITE_BLOCK);
-        out.writeBoolean(true);
-        block.write(out);
-        out.writeInt(nodes.length);
-        for (int i = 0; i < nodes.length; i++) {
+        out.writeShort( DATA_TRANFER_VERSION );
+        out.write( OP_WRITE_BLOCK );
+        out.writeLong( block.getBlockId() );
+        out.writeInt( nodes.length - 1 );
+        for (int i = 1; i < nodes.length; i++) {
           nodes[i].write(out);
         }
-        out.write(CHUNKED_ENCODING);
+        checksum.writeHeader( out );
         blockStream = out;
-        blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+        blockReplyStream = new DataInputStream(s.getInputStream());
       } while (retry);
       firstTime = false;
     }
@@ -1329,98 +1581,30 @@
       } 
     }
 
-    /**
-     * We're referring to the file pos here
-     */
-    public synchronized long getPos() throws IOException {
-      return filePos;
-    }
-			
-    /**
-     * Writes the specified byte to this output stream.
-     */
-    public synchronized void write(int b) throws IOException {
-      checkOpen();
-      if (closed) {
-        throw new IOException("Stream closed");
-      }
-
-      if ((bytesWrittenToBlock + pos == blockSize) ||
-          (pos >= BUFFER_SIZE)) {
-        flush();
-      }
-      outBuf[pos++] = (byte) b;
-      filePos++;
-    }
-
-    /**
-     * Writes the specified bytes to this output stream.
-     */
-    public synchronized void write(byte b[], int off, int len)
-      throws IOException {
+    // @see FSOutputSummer#writeChunk()
+    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
+                                                          throws IOException {
       checkOpen();
-      if (closed) {
-        throw new IOException("Stream closed");
-      }
-      while (len > 0) {
-        int remaining = Math.min(BUFFER_SIZE - pos,
-                                 (int)((blockSize - bytesWrittenToBlock) - pos));
-        int toWrite = Math.min(remaining, len);
-        System.arraycopy(b, off, outBuf, pos, toWrite);
-        pos += toWrite;
-        off += toWrite;
-        len -= toWrite;
-        filePos += toWrite;
-
-        if ((bytesWrittenToBlock + pos >= blockSize) ||
-            (pos == BUFFER_SIZE)) {
-          flush();
-        }
-      }
-    }
-
-    /**
-     * Flush the buffer, getting a stream to a new block if necessary.
-     */
-    public synchronized void flush() throws IOException {
-      checkOpen();
-      if (closed) {
-        throw new IOException("Stream closed");
+      int bytesPerChecksum = this.checksum.getBytesPerChecksum(); 
+      if (len > bytesPerChecksum || (len + bytesWrittenToBlock) > blockSize) {
+        // should never happen
+        throw new IOException("Mismatch in writeChunk() args");
       }
-
-      if (bytesWrittenToBlock + pos >= blockSize) {
-        flushData((int) blockSize - bytesWrittenToBlock);
+      
+      if ( backupFile == null ) {
+        openBackupStream();
       }
-      if (bytesWrittenToBlock == blockSize) {
+      
+      backupStream.write(b, offset, len);
+      backupStream.write(checksum);
+      
+      bytesWrittenToBlock += len;
+      filePos += len;
+      
+      if ( bytesWrittenToBlock >= blockSize ) {
         endBlock();
       }
-      flushData(pos);
-    }
-
-    /**
-     * Actually flush the accumulated bytes to the remote node,
-     * but no more bytes than the indicated number.
-     */
-    private synchronized void flushData(int maxPos) throws IOException {
-      int workingPos = Math.min(pos, maxPos);
-            
-      if (workingPos > 0) {
-        if (backupStream == null) {
-          throw new IOException("Trying to write to backupStream " +
-                                "but it already closed or not open");
-        }
-        //
-        // To the local block backup, write just the bytes
-        //
-        backupStream.write(outBuf, 0, workingPos);
 
-        //
-        // Track position
-        //
-        bytesWrittenToBlock += workingPos;
-        System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
-        pos -= workingPos;
-      }
     }
 
     /**
@@ -1439,21 +1623,63 @@
       boolean sentOk = false;
       int remainingAttempts = 
         conf.getInt("dfs.client.block.write.retries", 3);
+      int numSuccessfulWrites = 0;
+            
       while (!sentOk) {
         nextBlockOutputStream();
-        InputStream in = new FileInputStream(backupFile);
+
+        long bytesLeft = bytesWrittenToBlock;
+        int bytesPerChecksum = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize(); 
+        byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
+
+        InputStream in = (bytesLeft > 0) ? 
+                         new FileInputStream(backupFile) : null;    
         try {
-          byte buf[] = new byte[BUFFER_SIZE];
-          int bytesRead = in.read(buf);
-          while (bytesRead > 0) {
-            blockStream.writeLong((long) bytesRead);
-            blockStream.write(buf, 0, bytesRead);
+
+          while ( bytesLeft >= 0 ) {
+            int len = (int) Math.min( bytesLeft, bytesPerChecksum );
+            if ( len > 0 ) {
+              FileUtil.readFully( in, buf, 0, len + checksumSize);
+            }
+
+            blockStream.writeInt( len );
+            blockStream.write( buf, 0, len + checksumSize );
+
+            if ( bytesLeft == 0 ) {
+              break;
+            }
+              
+            bytesLeft -= len;
+
             if (progress != null) { progress.progress(); }
-            bytesRead = in.read(buf);
           }
-          internalClose();
-          sentOk = true;
+          
+          blockStream.flush();
+          
+          numSuccessfulWrites++;
+
+          //We should wait for response from the receiver.
+          int reply = blockReplyStream.readByte();
+          if ( reply == OP_STATUS_SUCCESS ||
+              ( reply == OP_STATUS_ERROR_EXISTS &&
+                  numSuccessfulWrites > 1 ) ) {
+            s.close();
+            s = null;
+            sentOk = true;
+          } else {
+            throw new IOException( "Got error reply " + reply +
+                                   " while writting the block " 
+                                   + block );
+          }
+
         } catch (IOException ie) {
+          /*
+           * The error could be OP_STATUS_ERROR_EXISTS.
+           * We are not handling it properly here yet.
+           * We should try to read a byte from blockReplyStream
+           * wihtout blocking. 
+           */
           handleSocketException(ie);
           remainingAttempts -= 1;
           if (remainingAttempts == 0) {
@@ -1464,47 +1690,17 @@
           } catch (InterruptedException e) {
           }
         } finally {
-          in.close();
+          if (in != null) {
+            in.close();
+          }
         }
       }
 
       bytesWrittenToBlock = 0;
       //
-      // Delete local backup, start new one
+      // Delete local backup.
       //
       deleteBackupFile();
-      File tmpFile = newBackupFile();
-      bytesWrittenToBlock = 0;
-      backupStream = new FileOutputStream(tmpFile);
-      backupFile = tmpFile;
-    }
-
-    /**
-     * Close down stream to remote datanode.
-     */
-    private synchronized void internalClose() throws IOException {
-      try {
-        blockStream.writeLong(0);
-        blockStream.flush();
-
-        long complete = blockReplyStream.readLong();
-        if (complete != WRITE_COMPLETE) {
-          LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
-          throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
-        }
-      } catch (IOException ie) {
-        throw (IOException)
-          new IOException("failure closing block of file " +
-                          src.toString() + " to node " +
-                          (datanodeName == null ? "?" : datanodeName)
-                          ).initCause(ie);
-      }
-                    
-      LocatedBlock lb = new LocatedBlock();
-      lb.readFields(blockReplyStream);
-
-      s.close();
-      s = null;
     }
 
     private void handleSocketException(IOException ie) throws IOException {
@@ -1517,6 +1713,7 @@
       } catch (IOException ie2) {
         LOG.warn("Error closing socket.", ie2);
       }
+      //XXX Why are we abondoning the block? There could be retries left.
       namenode.abandonBlock(block, src.toString());
     }
 
@@ -1529,9 +1726,10 @@
       if (closed) {
         throw new IOException("Stream closed");
       }
-          
+      
+      flushBuffer();
+      
       try {
-        flush();
         if (filePos == 0 || bytesWrittenToBlock != 0) {
           try {
             endBlock();
@@ -1540,15 +1738,11 @@
             throw e;
           }
         }
-            
-        closeBackupStream();
-        deleteBackupFile();
 
         if (s != null) {
           s.close();
           s = null;
         }
-        super.close();
 
         long localstart = System.currentTimeMillis();
         boolean fileComplete = false;
@@ -1570,6 +1764,23 @@
           pendingCreates.remove(src.toString());
         }
       }
+    }
+  }
+  
+  void reportChecksumFailure(String file, Block blk, DatanodeInfo dn) {
+    DatanodeInfo [] dnArr = { dn };
+    LocatedBlock [] lblocks = { new LocatedBlock(blk, dnArr) };
+    reportChecksumFailure(file, lblocks);
+  }
+  
+  // just reports checksum failure and ignores any exception during the report.
+  void reportChecksumFailure(String file, LocatedBlock lblocks[]) {
+    try {
+      reportBadBlocks(lblocks);
+    } catch (IOException ie) {
+      LOG.info("Found corruption while reading " + file 
+               + ".  Error repairing corrupt blocks.  Bad blocks remain. " 
+               + StringUtils.stringifyException(ie));
     }
   }
 }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java?view=auto&rev=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java Mon Jul 16 14:34:59 2007
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
+
+import java.io.*;
+
+/**
+ * This class provides inteface and utilities for processing checksums for
+ * DFS data transfers.
+ */
+
+public class DataChecksum implements Checksum {
+  
+  // Misc constants
+  public static final int HEADER_LEN = 5; /// 1 byte type and 4 byte len
+  
+  // checksum types
+  public static final int CHECKSUM_NULL    = 0;
+  public static final int CHECKSUM_CRC32   = 1;
+  
+  private static final int CHECKSUM_NULL_SIZE  = 0;
+  private static final int CHECKSUM_CRC32_SIZE = 4;
+  
+  
+  public static DataChecksum newDataChecksum( int type, int bytesPerChecksum ) {
+    if ( bytesPerChecksum <= 0 ) {
+      return null;
+    }
+    
+    switch ( type ) {
+    case CHECKSUM_NULL :
+      return new DataChecksum( CHECKSUM_NULL, new ChecksumNull(), 
+                               CHECKSUM_NULL_SIZE, bytesPerChecksum );
+    case CHECKSUM_CRC32 :
+      return new DataChecksum( CHECKSUM_CRC32, new CRC32(), 
+                               CHECKSUM_CRC32_SIZE, bytesPerChecksum );
+    default:
+      return null;  
+    }
+  }
+  
+  /**
+   * Creates a DataChecksum from HEADER_LEN bytes from arr[offset].
+   * @return DataChecksum of the type in the array or null in case of an error.
+   */
+  public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
+    if ( offset < 0 || bytes.length < offset + HEADER_LEN ) {
+      return null;
+    }
+    
+    // like readInt():
+    int bytesPerChecksum = ( (bytes[offset+1] & 0xff) << 24 ) | 
+                           ( (bytes[offset+2] & 0xff) << 16 ) |
+                           ( (bytes[offset+3] & 0xff) << 8 )  |
+                           ( (bytes[offset+4] & 0xff) );
+    return newDataChecksum( bytes[0], bytesPerChecksum );
+  }
+  
+  /**
+   * This constructucts a DataChecksum by reading HEADER_LEN bytes from
+   * input stream <i>in</i>
+   */
+  public static DataChecksum newDataChecksum( DataInputStream in )
+                                 throws IOException {
+    int type = in.readByte();
+    int bpc = in.readInt();
+    DataChecksum summer = newDataChecksum( type, bpc );
+    if ( summer == null ) {
+      throw new IOException( "Could not create DataChecksum of type " +
+                             type + " with bytesPerChecksum " + bpc );
+    }
+    return summer;
+  }
+  
+  /**
+   * Writes the checksum header to the output stream <i>out</i>.
+   */
+  public void writeHeader( DataOutputStream out ) 
+                           throws IOException { 
+    out.writeByte( type );
+    out.writeInt( bytesPerChecksum );
+  }
+  
+  /**
+   * Writes the current checksum to the stream.
+   * If <i>reset</i> is true, then resets the checksum.
+   * @return number of bytes written. Will be equal to getChecksumSize();
+   */
+   public int writeValue( DataOutputStream out, boolean reset )
+                          throws IOException {
+     if ( size <= 0 ) {
+       return 0;
+     }
+
+     if ( type == CHECKSUM_CRC32 ) {
+       out.writeInt( (int) summer.getValue() );
+     } else {
+       throw new IOException( "Unknown Checksum " + type );
+     }
+     
+     if ( reset ) {
+       reset();
+     }
+     
+     return size;
+   }
+   
+   /**
+    * Writes the current checksum to a buffer.
+    * If <i>reset</i> is true, then resets the checksum.
+    * @return number of bytes written. Will be equal to getChecksumSize();
+    */
+    public int writeValue( byte[] buf, int offset, boolean reset )
+                           throws IOException {
+      if ( size <= 0 ) {
+        return 0;
+      }
+
+      if ( type == CHECKSUM_CRC32 ) {
+        int checksum = (int) summer.getValue();
+        buf[offset+0] = (byte) ((checksum >>> 24) & 0xff);
+        buf[offset+1] = (byte) ((checksum >>> 16) & 0xff);
+        buf[offset+2] = (byte) ((checksum >>> 8) & 0xff);
+        buf[offset+3] = (byte) (checksum & 0xff);
+      } else {
+        throw new IOException( "Unknown Checksum " + type );
+      }
+      
+      if ( reset ) {
+        reset();
+      }
+      
+      return size;
+    }
+   
+   /**
+    * Compares the checksum located at buf[offset] with the current checksum.
+    * @return true if the checksum matches and false otherwise.
+    */
+   public boolean compare( byte buf[], int offset ) {
+     if ( size > 0 && type == CHECKSUM_CRC32 ) {
+       int checksum = ( (buf[offset+0] & 0xff) << 24 ) | 
+                      ( (buf[offset+1] & 0xff) << 16 ) |
+                      ( (buf[offset+2] & 0xff) << 8 )  |
+                      ( (buf[offset+3] & 0xff) );
+       return checksum == (int) summer.getValue();
+     }
+     return size == 0;
+   }
+   
+  private final int type;
+  private final int size;
+  private final Checksum summer;
+  private final int bytesPerChecksum;
+  private int inSum = 0;
+  
+  private DataChecksum( int checksumType, Checksum checksum,
+                        int sumSize, int chunkSize ) {
+    type = checksumType;
+    summer = checksum;
+    size = sumSize;
+    bytesPerChecksum = chunkSize;
+  }
+  
+  // Accessors
+  public int getChecksumType() {
+    return type;
+  }
+  public int getChecksumSize() {
+    return size;
+  }
+  public int getBytesPerChecksum() {
+    return bytesPerChecksum;
+  }
+  public int getNumBytesInSum() {
+    return inSum;
+  }
+  //Checksum Interface. Just a wrapper around member summer.
+  public long getValue() {
+    return summer.getValue();
+  }
+  public void reset() {
+    summer.reset();
+    inSum = 0;
+  }
+  public void update( byte[] b, int off, int len ) {
+    if ( len > 0 ) {
+      summer.update( b, off, len );
+      inSum += len;
+    }
+    // Can be removed.
+    assert inSum <= bytesPerChecksum : "DataChecksum.update() : inSum " + 
+                inSum + " > " + " bytesPerChecksum " + bytesPerChecksum ; 
+  }
+  public void update( int b ) {
+    summer.update( b );
+    inSum += 1;
+  }
+  
+  /**
+   * This just provides a dummy implimentation for Checksum class
+   * This is used when there is no checksum available or required for 
+   * data
+   */
+  static class ChecksumNull implements Checksum {
+    
+    public ChecksumNull() {}
+    
+    //Dummy interface
+    public long getValue() { return 0; }
+    public void reset() {}
+    public void update(byte[] b, int off, int len) {}
+    public void update(int b) {}
+  };
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Jul 16 14:34:59 2007
@@ -30,6 +30,9 @@
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.mapred.StatusHttpServer;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.dfs.BlockCommand;
+import org.apache.hadoop.dfs.DatanodeProtocol;
+import org.apache.hadoop.fs.FileUtil;
 
 import java.io.*;
 import java.net.*;
@@ -73,12 +76,13 @@
  **********************************************************/
 public class DataNode implements FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
-  //
-  // REMIND - mjc - I might bring "maxgigs" back so user can place 
-  // artificial  limit on space
-  //private static final long GIGABYTE = 1024 * 1024 * 1024;
-  //private static long numGigs = Configuration.get().getLong("dfs.datanode.maxgigs", 100);
-  //
+
+  /**
+   * A buffer size small enough that read/writes while reading headers 
+   * don't result in multiple io calls but reading larger amount of data 
+   * like one checksum size does not result in extra copy. 
+   */
+  public static final int SMALL_HDR_BUFFER_SIZE = 64;
 
   /**
    * Util method to build socket addr from either:
@@ -112,7 +116,7 @@
   DatanodeRegistration dnRegistration = null;
   private String networkLoc;
   volatile boolean shouldRun = true;
-  Vector<Block> receivedBlockList = new Vector<Block>();
+  LinkedList<Block> receivedBlockList = new LinkedList<Block>();
   int xmitsInProgress = 0;
   Daemon dataXceiveServer = null;
   long blockReportInterval;
@@ -126,6 +130,7 @@
   private static DataNode datanodeObject = null;
   private static Thread dataNodeThread = null;
   String machineName;
+  int defaultBytesPerChecksum = 512;
 
   private static class DataNodeMetrics implements Updater {
     private final MetricsRecord metricsRecord;
@@ -222,6 +227,10 @@
                                      conf.get("dfs.datanode.dns.nameserver","default"));
     InetSocketAddress nameNodeAddr = createSocketAddr(
                                                       conf.get("fs.default.name", "local"));
+    
+    this.defaultBytesPerChecksum = 
+       Math.max(conf.getInt("io.bytes.per.checksum", 512), 1); 
+    
     int tmpPort = conf.getInt("dfs.datanode.port", 50010);
     storage = new DataStorage();
     // construct registration
@@ -709,25 +718,30 @@
      */
     public void run() {
       try {
-        DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-        try {
-          byte op = (byte) in.read();
-          if (op == OP_WRITE_BLOCK) {
-            writeBlock(in);
-          } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK ||
-                     op == OP_READ_RANGE_BLOCK) {
-            readBlock(in, op);
-          } else {
-            while (op >= 0) {
-              System.out.println("Faulty op: " + op);
-              op = (byte) in.read();
-            }
-            throw new IOException("Unknown opcode for incoming data stream");
-          }
-        } finally {
-          in.close();
+        DataInputStream in = new DataInputStream(
+           new BufferedInputStream(s.getInputStream(), SMALL_HDR_BUFFER_SIZE));
+        short version = in.readShort();
+        if ( version != DATA_TRANFER_VERSION ) {
+          throw new IOException( "Version Mismatch" );
+        }
+
+        byte op = in.readByte();
+
+        switch ( op ) {
+        case OP_READ_BLOCK:
+          readBlock( in );
+          break;
+        case OP_WRITE_BLOCK:
+          writeBlock( in );
+          break;
+        case OP_READ_METADATA:
+          readMetadata( in );
+          
+        default:
+          System.out.println("Faulty op: " + op);
+          throw new IOException("Unknown opcode " + op + "in data stream");
         }
-      } catch (Throwable t) {
+       } catch (Throwable t) {
         LOG.error("DataXCeiver", t);
       } finally {
         try {
@@ -742,104 +756,35 @@
     /**
      * Read a block from the disk
      * @param in The stream to read from
-     * @param op OP_READ_BLOCK or OP_READ_SKIPBLOCK
      * @throws IOException
      */
-    private void readBlock(DataInputStream in, byte op) throws IOException {
+    private void readBlock(DataInputStream in) throws IOException {
       //
       // Read in the header
       //
-      Block b = new Block();
-      b.readFields(in);
-
-      long toSkip = 0;
-      long endOffset = -1;
-      if (op == OP_READSKIP_BLOCK) {
-        toSkip = in.readLong();
-      } else if (op == OP_READ_RANGE_BLOCK) {
-        toSkip = in.readLong();
-        endOffset = in.readLong();
-      }
+      long blockId = in.readLong();          
+      Block block = new Block( blockId, 0 );
 
-      //
-      // Open reply stream
-      //
-      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+      long startOffset = in.readLong();
+      long length = in.readLong();
+      
       try {
-        //
-        // Write filelen of -1 if error
-        //
-        if (!data.isValidBlock(b)) {
-          out.writeLong(-1);
-        } else {
-          //
-          // Get blockdata from disk
-          //
-          long len = data.getLength(b);
-          if (endOffset < 0) { endOffset = len; }
-          DataInputStream in2 = new DataInputStream(data.getBlockData(b));
-          out.writeLong(len);
-
-          long amtSkipped = 0;
-          if ((op == OP_READSKIP_BLOCK) || (op == OP_READ_RANGE_BLOCK)) {
-            if (toSkip > len) {
-              toSkip = len;
-            }
-            try {
-              amtSkipped = in2.skip(toSkip);
-            } catch (IOException iex) {
-              shutdown();
-              throw iex;
-            }
-            out.writeLong(amtSkipped);
-          }
-          if (op == OP_READ_RANGE_BLOCK) {
-            if (endOffset > len) {
-              endOffset = len;
-            }
-            out.writeLong(endOffset);
-          }
-
-          byte buf[] = new byte[BUFFER_SIZE];
-          try {
-            int toRead = (int) (endOffset - amtSkipped + 1);
-            int bytesRead = 0;
-            try {
-              bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));
-              myMetrics.readBytes(bytesRead);
-            } catch (IOException iex) {
-              shutdown();
-              throw iex;
-            }
-            while (toRead > 0 && bytesRead >= 0) {
-              out.write(buf, 0, bytesRead);
-              toRead -= bytesRead;
-              if (toRead > 0) {
-                try {
-                  bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));
-                  myMetrics.readBytes(bytesRead);
-                } catch (IOException iex) {
-                  shutdown();
-                  throw iex;
-                }
-              }
-            }
-          } catch (SocketException se) {
-            // This might be because the reader
-            // closed the stream early
-          } finally {
-            try {
-              in2.close();
-            } catch (IOException iex) {
-              shutdown();
-              throw iex;
-            }
-          }
-        }
+        //XXX Buffered output stream?
+        long read = sendBlock(s, block, startOffset, length, null );
+        myMetrics.readBytes((int)read);
         myMetrics.readBlocks(1);
-        LOG.info("Served block " + b + " to " + s.getInetAddress());
-      } finally {
-        out.close();
+        LOG.info("Served block " + block + " to " + s.getInetAddress());
+      } catch ( SocketException ignored ) {
+        // Its ok for remote side to close the connection anytime.
+        myMetrics.readBlocks(1);
+      } catch ( IOException ioe ) {
+        /* What exactly should we do here?
+         * Earlier version shutdown() datanode if there is disk error.
+         */
+        LOG.warn( "Got exception while serving " + block + " to " +
+                  s.getInetAddress() + ": " + 
+                  StringUtils.stringifyException(ioe) );
+        throw ioe;
       }
     }
 
@@ -852,14 +797,23 @@
       //
       // Read in the header
       //
-      DataOutputStream reply = 
-        new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+      DataOutputStream reply = new DataOutputStream(s.getOutputStream());
+      DataOutputStream out = null;
+      DataOutputStream checksumOut = null;
+      Socket mirrorSock = null;
+      DataOutputStream mirrorOut = null;
+      DataInputStream mirrorIn = null;
+      
       try {
-        boolean shouldReportBlock = in.readBoolean();
-        Block b = new Block();
-        b.readFields(in);
+        /* We need an estimate for block size to check if the 
+         * disk partition has enough space. For now we just increment
+         * FSDataset.reserved by configured dfs.block.size
+         * Other alternative is to include the block size in the header
+         * sent by DFSClient.
+         */
+        Block block = new Block( in.readLong(), 0 );
         int numTargets = in.readInt();
-        if (numTargets <= 0) {
+        if ( numTargets < 0 ) {
           throw new IOException("Mislabelled incoming datastream.");
         }
         DatanodeInfo targets[] = new DatanodeInfo[numTargets];
@@ -868,225 +822,419 @@
           tmp.readFields(in);
           targets[i] = tmp;
         }
-        byte encodingType = (byte) in.read();
-        long len = in.readLong();
-            
-        //
-        // Make sure curTarget is equal to this machine
-        //
-        DatanodeInfo curTarget = targets[0];
-            
-        //
-        // Track all the places we've successfully written the block
-        //
-        Vector<DatanodeInfo> mirrors = new Vector<DatanodeInfo>();
             
+        DataChecksum checksum = DataChecksum.newDataChecksum( in );
+
         //
         // Open local disk out
         //
-        OutputStream o;
-        try {
-          o = data.writeToBlock(b);
-        } catch( IOException e ) {
-          checkDiskError( e );
-          throw e;
-        }
-        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(o));
+        FSDataset.BlockWriteStreams streams = data.writeToBlock( block );
+        out = new DataOutputStream(streams.dataOut);
+        checksumOut = new DataOutputStream(streams.checksumOut);
+        
         InetSocketAddress mirrorTarget = null;
         String mirrorNode = null;
-        try {
-          //
-          // Open network conn to backup machine, if 
-          // appropriate
-          //
-          DataInputStream in2 = null;
-          DataOutputStream out2 = null;
-          if (targets.length > 1) {
-            // Connect to backup machine
-            mirrorNode = targets[1].getName();
-            mirrorTarget = createSocketAddr(mirrorNode);
-            try {
-              Socket s2 = new Socket();
-              s2.connect(mirrorTarget, READ_TIMEOUT);
-              s2.setSoTimeout(READ_TIMEOUT);
-              out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));
-              in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));
-                  
-              // Write connection header
-              out2.write(OP_WRITE_BLOCK);
-              out2.writeBoolean(shouldReportBlock);
-              b.write(out2);
-              out2.writeInt(targets.length - 1);
-              for (int i = 1; i < targets.length; i++) {
-                targets[i].write(out2);
-              }
-              out2.write(encodingType);
-              out2.writeLong(len);
-              myMetrics.replicatedBlocks(1);
-            } catch (IOException ie) {
-              if (out2 != null) {
-                LOG.info("Exception connecting to mirror " + mirrorNode 
-                         + "\n" + StringUtils.stringifyException(ie));
-                try {
-                  out2.close();
-                  in2.close();
-                } catch (IOException out2close) {
-                } finally {
-                  out2 = null;
-                  in2 = null;
-                }
-              }
+        //
+        // Open network conn to backup machine, if 
+        // appropriate
+        //
+        if (targets.length > 0) {
+          // Connect to backup machine
+          mirrorNode = targets[0].getName();
+          mirrorTarget = createSocketAddr(mirrorNode);
+          try {
+            mirrorSock = new Socket();
+            mirrorSock.connect(mirrorTarget, READ_TIMEOUT);
+            mirrorSock.setSoTimeout(READ_TIMEOUT);
+            mirrorOut = new DataOutputStream( 
+                        new BufferedOutputStream(mirrorSock.getOutputStream(),
+                                                 SMALL_HDR_BUFFER_SIZE));
+            mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
+            //Copied from DFSClient.java!
+            mirrorOut.writeShort( DATA_TRANFER_VERSION );
+            mirrorOut.write( OP_WRITE_BLOCK );
+            mirrorOut.writeLong( block.getBlockId() );
+            mirrorOut.writeInt( targets.length - 1 );
+            for ( int i = 1; i < targets.length; i++ ) {
+              targets[i].write( mirrorOut );
             }
-          }
-              
-          //
-          // Process incoming data, copy to disk and
-          // maybe to network. First copy to the network before
-          // writing to local disk so that all datanodes might
-          // write to local disk in parallel.
-          //
-          boolean anotherChunk = len != 0;
-          byte buf[] = new byte[BUFFER_SIZE];
-              
-          while (anotherChunk) {
-            while (len > 0) {
-              int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));
-              if (bytesRead < 0) {
-                throw new EOFException("EOF reading from "+s.toString());
-              }
-              if (bytesRead > 0) {
-                if (out2 != null) {
-                  try {
-                    out2.write(buf, 0, bytesRead);
-                  } catch (IOException out2e) {
-                    LOG.info("Exception writing to mirror " + mirrorNode 
-                             + "\n" + StringUtils.stringifyException(out2e));
-                    //
-                    // If stream-copy fails, continue 
-                    // writing to disk.  We shouldn't 
-                    // interrupt client write.
-                    //
-                    try {
-                      out2.close();
-                      in2.close();
-                    } catch (IOException out2close) {
-                    } finally {
-                      out2 = null;
-                      in2 = null;
-                    }
-                  }
-                }
-                try {
-                  out.write(buf, 0, bytesRead);
-                  myMetrics.wroteBytes(bytesRead);
-                } catch (IOException iex) {
-                  checkDiskError(iex);
-                  throw iex;
-                }
-                len -= bytesRead;
-              }
+            checksum.writeHeader( mirrorOut );
+            myMetrics.replicatedBlocks(1);
+          } catch (IOException ie) {
+            if (mirrorOut != null) {
+              LOG.info("Exception connecting to mirror " + mirrorNode 
+                       + "\n" + StringUtils.stringifyException(ie));
+              mirrorOut = null;
             }
-                
-            if (encodingType == RUNLENGTH_ENCODING) {
-              anotherChunk = false;
-            } else if (encodingType == CHUNKED_ENCODING) {
-              len = in.readLong();
-              if (out2 != null) {
-                try {
-                  out2.writeLong(len);
-                } catch (IOException ie) {
-                  LOG.info("Exception writing to mirror " + mirrorNode 
-                           + "\n" + StringUtils.stringifyException(ie));
-                  try {
-                    out2.close();
-                    in2.close();
-                  } catch (IOException ie2) {
-                    // NOTHING
-                  } finally {
-                    out2 = null;
-                    in2 = null;
-                  }
-                }
-              }
-              if (len == 0) {
-                anotherChunk = false;
-              }
+          }
+        }
+        
+        // XXX The following code is similar on both sides...
+        
+        int bytesPerChecksum = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize();
+        byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
+        long blockLen = 0;
+        long lastOffset = 0;
+        long lastLen = 0;
+        int status = -1;
+        boolean headerWritten = false;
+        
+        while ( true ) {
+          // Read one data chunk in each loop.
+          
+          long offset = lastOffset + lastLen;
+          int len = (int) in.readInt();
+          if ( len < 0 || len > bytesPerChecksum ) {
+            LOG.warn( "Got wrong length during writeBlock(" +
+                      block + ") from " + s.getRemoteSocketAddress() +
+                      " at offset " + offset + ": " + len + 
+                      " expected <= " + bytesPerChecksum );
+            status = OP_STATUS_ERROR;
+            break;
+          }
+
+          in.readFully( buf, 0, len + checksumSize );
+          
+          if ( len > 0 && checksumSize > 0 ) {
+            /*
+             * Verification is not included in the initial design.
+             * For now, it at least catches some bugs. Later, we can 
+             * include this after showing that it does not affect 
+             * performance much.
+             */
+            checksum.update( buf, 0, len  );
+            
+            if ( ! checksum.compare( buf, len ) ) {
+              throw new IOException( "Unexpected checksum mismatch " +
+                                     "while writing " + block + 
+                                     " from " +
+                                     s.getRemoteSocketAddress() );
             }
+            
+            checksum.reset();
           }
-              
-          if (out2 != null) {
+
+          // First write to remote node before writing locally.
+          if (mirrorOut != null) {
             try {
-              out2.flush();
-              long complete = in2.readLong();
-              if (complete != WRITE_COMPLETE) {
-                LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
-              }
-              LocatedBlock newLB = new LocatedBlock();
-              newLB.readFields(in2);
-              in2.close();
-              out2.close();
-              DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
-              for (int k = 0; k < mirrorsSoFar.length; k++) {
-                mirrors.add(mirrorsSoFar[k]);
-              }
-            } catch (IOException ie) {
-              LOG.info("Exception writing to mirror " + mirrorNode 
-                       + "\n" + StringUtils.stringifyException(ie));
-              try {
-                out2.close();
-                in2.close();
-              } catch (IOException ie2) {
-                // NOTHING
-              } finally {
-                out2 = null;
-                in2 = null;
-              }
+              mirrorOut.writeInt( len );
+              mirrorOut.write( buf, 0, len + checksumSize );
+            } catch (IOException ioe) {
+              LOG.info( "Exception writing to mirror " + mirrorNode + 
+                        "\n" + StringUtils.stringifyException(ioe) );
+              //
+              // If stream-copy fails, continue 
+              // writing to disk.  We shouldn't 
+              // interrupt client write.
+              //
+              mirrorOut = null;
             }
           }
-          if (out2 == null) {
-            LOG.info("Received block " + b + " from " + 
-                     s.getInetAddress());
-          } else {
-            LOG.info("Received block " + b + " from " + 
-                     s.getInetAddress() + 
-                     " and mirrored to " + mirrorTarget);
-          }
-        } finally {
+
           try {
-            out.close();
+            if ( !headerWritten ) { 
+              // First DATA_CHUNK. 
+              // Write the header even if checksumSize is 0.
+              checksumOut.writeShort( FSDataset.METADATA_VERSION );
+              checksum.writeHeader( checksumOut );
+              headerWritten = true;
+            }
+            
+            if ( len > 0 ) {
+              out.write( buf, 0, len );
+              // Write checksum
+              checksumOut.write( buf, len, checksumSize );
+              myMetrics.wroteBytes( len );
+            }
+            
           } catch (IOException iex) {
             checkDiskError(iex);
             throw iex;
           }
-        }
-        data.finalizeBlock(b);
-        myMetrics.wroteBlocks(1);
+          
+          if ( len == 0 ) {
+
+            // We already have one successful write here. Should we
+            // wait for response from next target? We will skip for now.
+
+            block.setNumBytes( blockLen );
             
-        // 
-        // Tell the namenode that we've received this block 
-        // in full, if we've been asked to.  This is done
-        // during NameNode-directed block transfers, but not
-        // client writes.
-        //
-        if (shouldReportBlock) {
-          synchronized (receivedBlockList) {
-            receivedBlockList.add(b);
+            //Does this fsync()?
+            data.finalizeBlock( block );
+            myMetrics.wroteBlocks(1);
+            
+            status = OP_STATUS_SUCCESS;
+            
+            break;
+          }
+          
+          if ( lastLen > 0 && lastLen != bytesPerChecksum ) {
+            LOG.warn( "Got wrong length during writeBlock(" +
+                      block + ") from " + s.getRemoteSocketAddress() +
+                      " : " + " got " + lastLen + " instead of " +
+                      bytesPerChecksum );
+            status = OP_STATUS_ERROR;
+            break;
+          }
+          
+          lastOffset = offset;
+          lastLen = len;
+          blockLen += len;
+        }
+        // done with reading the data.
+        
+        if ( status == OP_STATUS_SUCCESS ) {
+          /* Informing the name node could take a long long time!
+             Should we wait till namenode is informed before responding
+             with success to the client? For now we don't.
+          */
+          synchronized ( receivedBlockList ) {
+            receivedBlockList.add( block );
             receivedBlockList.notifyAll();
           }
+          
+          String msg = "Received block " + block + " from " + 
+                       s.getInetAddress();
+          
+          if ( mirrorOut != null ) {
+            //Wait for the remote reply
+            mirrorOut.flush();
+            byte result = OP_STATUS_ERROR; 
+            try {
+              result = mirrorIn.readByte();
+            } catch ( IOException ignored ) {}
+
+            msg += " and " +  (( result != OP_STATUS_SUCCESS ) ? 
+                                "failed to mirror to " : " mirrored to ") +
+                   mirrorTarget;
+            
+            mirrorOut = null;
+          }
+          
+          LOG.info(msg);
         }
             
+        if ( status >= 0 ) {
+          try {
+            reply.writeByte( status );
+            reply.flush();
+          } catch ( IOException ignored ) {}
+        }
+        
+      } finally {
+        try {
+          if ( out != null )
+            out.close();
+          if ( checksumOut != null )
+            checksumOut.close();
+          if ( mirrorSock != null )
+            mirrorSock.close();
+        } catch (IOException iex) {
+          shutdown();
+          throw iex;
+        }
+      }
+    }
+    
+    /**
+     * Reads the metadata and sends the data in one 'DATA_CHUNK'
+     * @param in
+     */
+    void readMetadata(DataInputStream in) throws IOException {
+      
+      Block block = new Block( in.readLong(), 0 );
+      InputStream checksumIn = null;
+      DataOutputStream out = null;
+      
+      try {
+        File blockFile = data.getBlockFile( block );
+        File checksumFile = FSDataset.getMetaFile( blockFile );
+        checksumIn = new FileInputStream(checksumFile);
+
+        long fileSize = checksumFile.length();
+        if (fileSize >= 1L<<31 || fileSize <= 0) {
+          throw new IOException("Unexpected size for checksumFile " +
+                                checksumFile);
+        }
+
+        byte [] buf = new byte[(int)fileSize];
+        FileUtil.readFully(checksumIn, buf, 0, buf.length);
+        
+        out = new DataOutputStream(s.getOutputStream());
+        
+        out.writeByte(OP_STATUS_SUCCESS);
+        out.writeInt(buf.length);
+        out.write(buf);
+        
+        //last DATA_CHUNK
+        out.writeInt(0);
+      } finally {
+        FileUtil.closeStream(checksumIn);
+      }
+    }
+  }
+
+  /** sendBlock() is used to read block and its metadata and stream
+   * the data to either a client or to another datanode.
+   * If argument targets is null, then it is assumed to be replying
+   * to a client request (OP_BLOCK_READ). Otherwise, we are replicating
+   * to another datanode.
+   * 
+   * returns total bytes reads, including crc.
+   */
+  long sendBlock(Socket sock, Block block,
+                 long startOffset, long length, DatanodeInfo targets[] )
+                 throws IOException {
+    // May be we should just use io.file.buffer.size.
+    DataOutputStream out = new DataOutputStream(
+                           new BufferedOutputStream(sock.getOutputStream(), 
+                                                    SMALL_HDR_BUFFER_SIZE));
+    DataInputStream in = null;
+    DataInputStream checksumIn = null;
+    long totalRead = 0;    
+
+
+    /* XXX This will affect inter datanode transfers during 
+     * a CRC upgrade. There should not be any replication
+     * during crc upgrade since we are in safe mode, right?
+     */    
+    boolean corruptChecksumOk = targets == null; 
+
+    try {
+      File blockFile = data.getBlockFile( block );
+      in = new DataInputStream( new FileInputStream( blockFile ) );
+
+      File checksumFile = FSDataset.getMetaFile( blockFile );
+      DataChecksum checksum = null;
+
+      if ( !corruptChecksumOk || checksumFile.exists() ) {
+        checksumIn = new DataInputStream( new FileInputStream(checksumFile) );
+          
+        //read and handle the common header here. For now just a version
+        short version = checksumIn.readShort();
+        if ( version != FSDataset.METADATA_VERSION ) {
+          LOG.warn( "Wrong version (" + version + 
+                    ") for metadata file for " + block + " ignoring ..." );
+        }
+        checksum = DataChecksum.newDataChecksum( checksumIn ) ;
+      } else {
+        LOG.warn( "Could not find metadata file for " + block );
+        // This only decides the buffer size. Use BUFFER_SIZE?
+        checksum = DataChecksum.newDataChecksum( DataChecksum.CHECKSUM_NULL,
+                                                 16*1024 );
+      }
+
+      int bytesPerChecksum = checksum.getBytesPerChecksum();
+      int checksumSize = checksum.getChecksumSize();
+
+      long endOffset = data.getLength( block );
+      if ( startOffset < 0 || startOffset > endOffset ||
+          (length + startOffset) > endOffset ) {
+        String msg = " Offset " + startOffset + " and length " + length + 
+                     " don't match block " + block +  " ( blockLen " + 
+                     endOffset + " )"; 
+        LOG.warn( "sendBlock() : " + msg );
+        if ( targets != null ) {
+          throw new IOException(msg);
+        } else {
+          out.writeShort( OP_STATUS_ERROR_INVALID );
+          return totalRead;
+        }
+      }
+
+      byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
+      long offset = (startOffset - (startOffset % bytesPerChecksum));
+      if ( length >= 0 ) {
+        // Make sure endOffset points to end of a checksumed chunk. 
+        long tmpLen = startOffset + length + (startOffset - offset);
+        if ( tmpLen % bytesPerChecksum != 0 ) { 
+          tmpLen += ( bytesPerChecksum - tmpLen % bytesPerChecksum );
+        }
+        if ( tmpLen < endOffset ) {
+          endOffset = tmpLen;
+        }
+      }
+
+      // seek to the right offsets
+      if ( offset > 0 ) {
+        long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ;
+        /* XXX skip() could be very inefficent. Should be seek(). 
+         * at least skipFully
+         */
+        if ( in.skip( offset ) != offset || 
+            ( checksumSkip > 0 && 
+                checksumIn.skip( checksumSkip ) != checksumSkip ) ) {
+          throw new IOException( "Could not seek to right position while " +
+                                 "reading for " + block );
+        }
+      }
+      
+      if ( targets != null ) {
         //
-        // Tell client job is done, and reply with
-        // the new LocatedBlock.
+        // Header info
         //
-        reply.writeLong(WRITE_COMPLETE);
-        mirrors.add(curTarget);
-        LocatedBlock newLB = new LocatedBlock(b, mirrors.toArray(new DatanodeInfo[mirrors.size()]));
-        newLB.write(reply);
-      } finally {
-        reply.close();
+        out.writeShort( DATA_TRANFER_VERSION );
+        out.writeByte( OP_WRITE_BLOCK );
+        out.writeLong( block.getBlockId() );
+        out.writeInt(targets.length-1);
+        for (int i = 1; i < targets.length; i++) {
+          targets[i].write( out );
+        }
+      } else {
+        out.writeShort( OP_STATUS_SUCCESS );          
       }
+
+      checksum.writeHeader( out );
+      
+      if ( targets == null ) {
+        out.writeLong( offset );
+      }
+      
+      while ( endOffset >= offset ) {
+        // Write one data chunk per loop.
+        int len = (int) Math.min( endOffset - offset, bytesPerChecksum );
+        if ( len > 0 ) {
+          in.readFully( buf, 0, len );
+          totalRead += len;
+          
+          if ( checksumSize > 0 && checksumIn != null ) {
+            try {
+              checksumIn.readFully( buf, len, checksumSize );
+              totalRead += checksumSize;
+            } catch ( IOException e ) {
+              LOG.warn( " Could not read checksum for data at offset " +
+                        offset + " for block " + block + " got : " + 
+                        StringUtils.stringifyException(e) );
+              FileUtil.closeStream( checksumIn );
+              checksumIn = null;
+              if ( corruptChecksumOk ) {
+                // Just fill the array with zeros.
+                Arrays.fill( buf, len, len + checksumSize, (byte)0 );
+              } else {
+                throw e;
+              }
+            }
+          }
+        }
+
+        out.writeInt( len );
+        out.write( buf, 0, len + checksumSize );
+        
+        if ( offset == endOffset ) {
+          out.flush();
+          // We are not waiting for response from target.
+          break;
+        }
+        offset += len;
+      }
+    } finally {
+      FileUtil.closeStream( checksumIn );
+      FileUtil.closeStream( in );
+      FileUtil.closeStream( out );
     }
+    
+    return totalRead;
   }
 
   /**
@@ -1094,20 +1242,16 @@
    * sends a piece of data to another DataNode.
    */
   class DataTransfer implements Runnable {
-    InetSocketAddress curTarget;
     DatanodeInfo targets[];
     Block b;
-    byte buf[];
 
     /**
      * Connect to the first item in the target list.  Pass along the 
      * entire target list, the block, and the data.
      */
     public DataTransfer(DatanodeInfo targets[], Block b) throws IOException {
-      this.curTarget = createSocketAddr(targets[0].getName());
       this.targets = targets;
       this.b = b;
-      this.buf = new byte[BUFFER_SIZE];
     }
 
     /**
@@ -1115,46 +1259,23 @@
      */
     public void run() {
       xmitsInProgress++;
+      Socket sock = null;
+      
       try {
-        Socket s = new Socket();
-        s.connect(curTarget, READ_TIMEOUT);
-        s.setSoTimeout(READ_TIMEOUT);
-        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-        try {
-          long filelen = data.getLength(b);
-          DataInputStream in = new DataInputStream(new BufferedInputStream(data.getBlockData(b)));
-          try {
-            //
-            // Header info
-            //
-            out.write(OP_WRITE_BLOCK);
-            out.writeBoolean(true);
-            b.write(out);
-            out.writeInt(targets.length);
-            for (int i = 0; i < targets.length; i++) {
-              targets[i].write(out);
-            }
-            out.write(RUNLENGTH_ENCODING);
-            out.writeLong(filelen);
-
-            //
-            // Write the data
-            //
-            while (filelen > 0) {
-              int bytesRead = in.read(buf, 0, (int) Math.min(filelen, buf.length));
-              out.write(buf, 0, bytesRead);
-              filelen -= bytesRead;
-            }
-          } finally {
-            in.close();
-          }
-        } finally {
-          out.close();
-        }
-        LOG.info("Transmitted block " + b + " to " + curTarget);
-      } catch (IOException ie) {
-        LOG.warn("Failed to transfer "+b+" to "+curTarget, ie);
+        InetSocketAddress curTarget = 
+          createSocketAddr(targets[0].getName());
+        sock = new Socket();  
+        sock.connect(curTarget, READ_TIMEOUT);
+        sock.setSoTimeout(READ_TIMEOUT);
+        sendBlock( sock, b, 0, -1, targets );
+        LOG.info( "Transmitted block " + b + " to " + curTarget );
+
+      } catch ( IOException ie ) {
+        LOG.warn( "Failed to transfer " + b + " to " + 
+                  targets[0].getName() + " got " + 
+                  StringUtils.stringifyException( ie ) );
       } finally {
+        FileUtil.closeSocket(sock);
         xmitsInProgress--;
       }
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?view=diff&rev=556743&r1=556742&r2=556743
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Mon Jul 16 14:34:59 2007
@@ -31,9 +31,9 @@
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /*
-   * 7: processUpgradeCommand() added;
+   * 8: blockCrcUpgradeGetBlockLocations() added;
    */
-  public static final long versionID = 7L;
+  public static final long versionID = 8L;
   
   // error code
   final static int NOTIFY = 0;
@@ -116,4 +116,17 @@
    * @return a reply in the form of an upgrade command
    */
   UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException;
+  
+  /**
+   * Get locations for a given block. This is used only during 
+   * Block Level CRC upgrades (HADOOP-1134), when a datanode node
+   * misses the cluster wide distributed upgrade. It uses the same
+   * BlockCrcInfo class, that is also used during distributed upgrade
+   * 
+   * @param block 
+   * @return BlockCrcInfo that contains the block locations.
+   * @throws IOException
+   */
+  public BlockCrcInfo blockCrcUpgradeGetBlockLocations(Block block)
+                                                      throws IOException;  
 }



Mime
View raw message