hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jo...@apache.org
Subject svn commit: r685979 [1/3] - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/server/datanode/
Date Thu, 14 Aug 2008 17:58:31 GMT
Author: johan
Date: Thu Aug 14 10:58:30 2008
New Revision: 685979

URL: http://svn.apache.org/viewvc?rev=685979&view=rev
Log:
HADOOP-3935. Split out inner classes from DataNode.java. (johan)

Added:
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java   (contents, props changed)
      - copied, changed from r685529, hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java
Removed:
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=685979&r1=685978&r2=685979&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Aug 14 10:58:30 2008
@@ -192,6 +192,8 @@
     HADOOP-3844. Include message of local exception in RPC client failures.
     (Steve Loughran via omalley)
 
+    HADOOP-3935. Split out inner classes from DataNode.java. (johan)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=685979&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Aug 14 10:58:30 2008
@@ -0,0 +1,969 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/** A class that receives a block and writes to its own disk, meanwhile
+ * may copies it to another site. If a throttler is provided,
+ * streaming throttling is also supported.
+ **/
+class BlockReceiver implements java.io.Closeable, FSConstants {
+  public static final Log LOG = DataNode.LOG;
+  
+  private Block block; // the block to receive
+  protected boolean finalized;
+  private DataInputStream in = null; // from where data are read
+  private DataChecksum checksum; // from where chunks of a block can be read
+  private OutputStream out = null; // to block file at local disk
+  private DataOutputStream checksumOut = null; // to crc file at local disk
+  private int bytesPerChecksum;
+  private int checksumSize;
+  private ByteBuffer buf; // contains one full packet.
+  private int bufRead; //amount of valid data in the buf
+  private int maxPacketReadLen;
+  protected long offsetInBlock;
+  protected final String inAddr;
+  private String mirrorAddr;
+  private DataOutputStream mirrorOut;
+  private Daemon responder = null;
+  private BlockTransferThrottler throttler;
+  private FSDataset.BlockWriteStreams streams;
+  private boolean isRecovery = false;
+  private String clientName;
+  DatanodeInfo srcDataNode = null;
+  private Checksum partialCrc = null;
+  private DataNode datanode = null;
+
+  BlockReceiver(Block block, DataInputStream in, String inAddr,
+                boolean isRecovery, String clientName, 
+                DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
+    try{
+      this.block = block;
+      this.in = in;
+      this.inAddr = inAddr;
+      this.isRecovery = isRecovery;
+      this.clientName = clientName;
+      this.offsetInBlock = 0;
+      this.checksum = DataChecksum.newDataChecksum(in);
+      this.bytesPerChecksum = checksum.getBytesPerChecksum();
+      this.checksumSize = checksum.getChecksumSize();
+      this.srcDataNode = srcDataNode;
+      this.datanode = datanode;
+      //
+      // Open local disk out
+      //
+      streams = datanode.data.writeToBlock(block, isRecovery);
+      this.finalized = datanode.data.isValidBlock(block);
+      if (streams != null) {
+        this.out = streams.dataOut;
+        this.checksumOut = new DataOutputStream(new BufferedOutputStream(
+                                                  streams.checksumOut, 
+                                                  SMALL_BUFFER_SIZE));
+        // If this block is for appends, then remove it from periodic
+        // validation.
+        if (datanode.blockScanner != null && isRecovery) {
+          datanode.blockScanner.deleteBlock(block);
+        }
+      }
+    } catch(IOException ioe) {
+      IOUtils.closeStream(this);
+      throw ioe;
+    }
+  }
+
+  /**
+   * close files.
+   */
+  public void close() throws IOException {
+
+    IOException ioe = null;
+    // close checksum file
+    try {
+      if (checksumOut != null) {
+        checksumOut.flush();
+        checksumOut.close();
+        checksumOut = null;
+      }
+    } catch(IOException e) {
+      ioe = e;
+    }
+    // close block file
+    try {
+      if (out != null) {
+        out.flush();
+        out.close();
+        out = null;
+      }
+    } catch (IOException e) {
+      ioe = e;
+    }
+    // disk check
+    if(ioe != null) {
+      datanode.checkDiskError(ioe);
+      throw ioe;
+    }
+  }
+
+  /**
+   * Flush block data and metadata files to disk.
+   * @throws IOException
+   */
+  void flush() throws IOException {
+    if (checksumOut != null) {
+      checksumOut.flush();
+    }
+    if (out != null) {
+      out.flush();
+    }
+  }
+
+  /**
+   * While writing to mirrorOut, failure to write to mirror should not
+   * affect this datanode unless a client is writing the block.
+   */
+  private void handleMirrorOutError(IOException ioe) throws IOException {
+    LOG.info(datanode.dnRegistration + ":Exception writing block " +
+             block + " to mirror " + mirrorAddr + "\n" +
+             StringUtils.stringifyException(ioe));
+    mirrorOut = null;
+    //
+    // If stream-copy fails, continue
+    // writing to disk for replication requests. For client
+    // writes, return error so that the client can do error
+    // recovery.
+    //
+    if (clientName.length() > 0) {
+      throw ioe;
+    }
+  }
+  
+  /**
+   * Verify multiple CRC chunks. 
+   */
+  private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
+                             byte[] checksumBuf, int checksumOff ) 
+                             throws IOException {
+    while (len > 0) {
+      int chunkLen = Math.min(len, bytesPerChecksum);
+      
+      checksum.update(dataBuf, dataOff, chunkLen);
+
+      if (!checksum.compare(checksumBuf, checksumOff)) {
+        if (srcDataNode != null) {
+          try {
+            LOG.info("report corrupt block " + block + " from datanode " +
+                      srcDataNode + " to namenode");
+            LocatedBlock lb = new LocatedBlock(block, 
+                                            new DatanodeInfo[] {srcDataNode});
+            datanode.namenode.reportBadBlocks(new LocatedBlock[] {lb});
+          } catch (IOException e) {
+            LOG.warn("Failed to report bad block " + block + 
+                      " from datanode " + srcDataNode + " to namenode");
+          }
+        }
+        throw new IOException("Unexpected checksum mismatch " + 
+                              "while writing " + block + " from " + inAddr);
+      }
+
+      checksum.reset();
+      dataOff += chunkLen;
+      checksumOff += checksumSize;
+      len -= chunkLen;
+    }
+  }
+
+  /**
+   * Makes sure buf.position() is zero without modifying buf.remaining().
+   * It moves the data if position needs to be changed.
+   */
+  private void shiftBufData() {
+    if (bufRead != buf.limit()) {
+      throw new IllegalStateException("bufRead should be same as " +
+                                      "buf.limit()");
+    }
+    
+    //shift the remaining data on buf to the front
+    if (buf.position() > 0) {
+      int dataLeft = buf.remaining();
+      if (dataLeft > 0) {
+        byte[] b = buf.array();
+        System.arraycopy(b, buf.position(), b, 0, dataLeft);
+      }
+      buf.position(0);
+      bufRead = dataLeft;
+      buf.limit(bufRead);
+    }
+  }
+  
+  /**
+   * reads upto toRead byte to buf at buf.limit() and increments the limit.
+   * throws an IOException if read does not succeed.
+   */
+  private int readToBuf(int toRead) throws IOException {
+    if (toRead < 0) {
+      toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
+               - buf.limit();
+    }
+    
+    int nRead = in.read(buf.array(), buf.limit(), toRead);
+    
+    if (nRead < 0) {
+      throw new EOFException("while trying to read " + toRead + " bytes");
+    }
+    bufRead = buf.limit() + nRead;
+    buf.limit(bufRead);
+    return nRead;
+  }
+  
+  
+  /**
+   * Reads (at least) one packet and returns the packet length.
+   * buf.position() points to the start of the packet and 
+   * buf.limit() point to the end of the packet. There could 
+   * be more data from next packet in buf.<br><br>
+   * 
+   * It tries to read a full packet with single read call.
+   * Consecutive packets are usually of the same length.
+   */
+  private int readNextPacket() throws IOException {
+    /* This dances around buf a little bit, mainly to read 
+     * full packet with single read and to accept arbitarary size  
+     * for next packet at the same time.
+     */
+    if (buf == null) {
+      /* initialize buffer to the best guess size:
+       * 'chunksPerPacket' calculation here should match the same 
+       * calculation in DFSClient to make the guess accurate.
+       */
+      int chunkSize = bytesPerChecksum + checksumSize;
+      int chunksPerPacket = (datanode.writePacketSize - DataNode.PKT_HEADER_LEN - 
+                             SIZE_OF_INTEGER + chunkSize - 1)/chunkSize;
+      buf = ByteBuffer.allocate(DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
+                                Math.max(chunksPerPacket, 1) * chunkSize);
+      buf.limit(0);
+    }
+    
+    // See if there is data left in the buffer :
+    if (bufRead > buf.limit()) {
+      buf.limit(bufRead);
+    }
+    
+    while (buf.remaining() < SIZE_OF_INTEGER) {
+      if (buf.position() > 0) {
+        shiftBufData();
+      }
+      readToBuf(-1);
+    }
+    
+    /* We mostly have the full packet or at least enough for an int
+     */
+    buf.mark();
+    int payloadLen = buf.getInt();
+    buf.reset();
+    
+    if (payloadLen == 0) {
+      //end of stream!
+      buf.limit(buf.position() + SIZE_OF_INTEGER);
+      return 0;
+    }
+    
+    // check corrupt values for pktLen, 100MB upper limit should be ok?
+    if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
+      throw new IOException("Incorrect value for packet payload : " +
+                            payloadLen);
+    }
+    
+    int pktSize = payloadLen + DataNode.PKT_HEADER_LEN;
+    
+    if (buf.remaining() < pktSize) {
+      //we need to read more data
+      int toRead = pktSize - buf.remaining();
+      
+      // first make sure buf has enough space.        
+      int spaceLeft = buf.capacity() - buf.limit();
+      if (toRead > spaceLeft && buf.position() > 0) {
+        shiftBufData();
+        spaceLeft = buf.capacity() - buf.limit();
+      }
+      if (toRead > spaceLeft) {
+        byte oldBuf[] = buf.array();
+        int toCopy = buf.limit();
+        buf = ByteBuffer.allocate(toCopy + toRead);
+        System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
+        buf.limit(toCopy);
+      }
+      
+      //now read:
+      while (toRead > 0) {
+        toRead -= readToBuf(toRead);
+      }
+    }
+    
+    if (buf.remaining() > pktSize) {
+      buf.limit(buf.position() + pktSize);
+    }
+    
+    if (pktSize > maxPacketReadLen) {
+      maxPacketReadLen = pktSize;
+    }
+    
+    return payloadLen;
+  }
+  
+  /** 
+   * Receives and processes a packet. It can contain many chunks.
+   * returns size of the packet.
+   */
+  private int receivePacket() throws IOException {
+    
+    int payloadLen = readNextPacket();
+    
+    if (payloadLen <= 0) {
+      return payloadLen;
+    }
+    
+    buf.mark();
+    //read the header
+    buf.getInt(); // packet length
+    offsetInBlock = buf.getLong(); // get offset of packet in block
+    long seqno = buf.getLong();    // get seqno
+    boolean lastPacketInBlock = (buf.get() != 0);
+    
+    int endOfHeader = buf.position();
+    buf.reset();
+    
+    if (LOG.isDebugEnabled()){
+      LOG.debug("Receiving one packet for block " + block +
+                " of length " + payloadLen +
+                " seqno " + seqno +
+                " offsetInBlock " + offsetInBlock +
+                " lastPacketInBlock " + lastPacketInBlock);
+    }
+    
+    setBlockPosition(offsetInBlock);
+    
+    //First write the packet to the mirror:
+    if (mirrorOut != null) {
+      try {
+        mirrorOut.write(buf.array(), buf.position(), buf.remaining());
+        mirrorOut.flush();
+      } catch (IOException e) {
+        handleMirrorOutError(e);
+      }
+    }
+
+    buf.position(endOfHeader);        
+    int len = buf.getInt();
+    
+    if (len < 0) {
+      throw new IOException("Got wrong length during writeBlock(" + block + 
+                            ") from " + inAddr + " at offset " + 
+                            offsetInBlock + ": " + len); 
+    } 
+
+    if (len == 0) {
+      LOG.debug("Receiving empty packet for block " + block);
+    } else {
+      offsetInBlock += len;
+
+      int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
+                                                            checksumSize;
+
+      if ( buf.remaining() != (checksumLen + len)) {
+        throw new IOException("Data remaining in packet does not match " +
+                              "sum of checksumLen and dataLen");
+      }
+      int checksumOff = buf.position();
+      int dataOff = checksumOff + checksumLen;
+      byte pktBuf[] = buf.array();
+
+      buf.position(buf.limit()); // move to the end of the data.
+
+      /* skip verifying checksum iff this is not the last one in the 
+       * pipeline and clientName is non-null. i.e. Checksum is verified
+       * on all the datanodes when the data is being written by a 
+       * datanode rather than a client. Whe client is writing the data, 
+       * protocol includes acks and only the last datanode needs to verify 
+       * checksum.
+       */
+      if (mirrorOut == null || clientName.length() == 0) {
+        verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+      }
+
+      try {
+        if (!finalized) {
+          //finally write to the disk :
+          out.write(pktBuf, dataOff, len);
+
+          // If this is a partial chunk, then verify that this is the only
+          // chunk in the packet. Calculate new crc for this chunk.
+          if (partialCrc != null) {
+            if (len > bytesPerChecksum) {
+              throw new IOException("Got wrong length during writeBlock(" + 
+                                    block + ") from " + inAddr + " " +
+                                    "A packet can have only one partial chunk."+
+                                    " len = " + len + 
+                                    " bytesPerChecksum " + bytesPerChecksum);
+            }
+            partialCrc.update(pktBuf, dataOff, len);
+            byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
+            checksumOut.write(buf);
+            LOG.debug("Writing out partial crc for data len " + len);
+            partialCrc = null;
+          } else {
+            checksumOut.write(pktBuf, checksumOff, checksumLen);
+          }
+          datanode.myMetrics.bytesWritten.inc(len);
+        }
+      } catch (IOException iex) {
+        datanode.checkDiskError(iex);
+        throw iex;
+      }
+    }
+
+    /// flush entire packet before sending ack
+    flush();
+
+    // put in queue for pending acks
+    if (responder != null) {
+      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+                                      lastPacketInBlock); 
+    }
+    
+    if (throttler != null) { // throttle I/O
+      throttler.throttle(payloadLen);
+    }
+    
+    return payloadLen;
+  }
+
+  void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
+    checksum.writeHeader(mirrorOut);
+  }
+ 
+
+  void receiveBlock(
+      DataOutputStream mirrOut, // output to next datanode
+      DataInputStream mirrIn,   // input from next datanode
+      DataOutputStream replyOut,  // output to previous datanode
+      String mirrAddr, BlockTransferThrottler throttlerArg,
+      int numTargets) throws IOException {
+
+      mirrorOut = mirrOut;
+      mirrorAddr = mirrAddr;
+      throttler = throttlerArg;
+
+    try {
+      // write data chunk header
+      if (!finalized) {
+        BlockMetadataHeader.writeHeader(checksumOut, checksum);
+      }
+      if (clientName.length() > 0) {
+        responder = new Daemon(datanode.threadGroup, 
+                               new PacketResponder(this, block, mirrIn, 
+                                                   replyOut, numTargets,
+                                                   clientName));
+        responder.start(); // start thread to processes reponses
+      }
+
+      /* 
+       * Receive until packet length is zero.
+       */
+      while (receivePacket() > 0) {}
+
+      // flush the mirror out
+      if (mirrorOut != null) {
+        try {
+          mirrorOut.writeInt(0); // mark the end of the block
+          mirrorOut.flush();
+        } catch (IOException e) {
+          handleMirrorOutError(e);
+        }
+      }
+
+      // wait for all outstanding packet responses. And then
+      // indicate responder to gracefully shutdown.
+      if (responder != null) {
+        ((PacketResponder)responder.getRunnable()).close();
+      }
+
+      // if this write is for a replication request (and not
+      // from a client), then finalize block. For client-writes, 
+      // the block is finalized in the PacketResponder.
+      if (clientName.length() == 0) {
+        // close the block/crc files
+        close();
+
+        // Finalize the block. Does this fsync()?
+        block.setNumBytes(offsetInBlock);
+        datanode.data.finalizeBlock(block);
+        datanode.myMetrics.blocksWritten.inc();
+      }
+
+    } catch (IOException ioe) {
+      LOG.info("Exception in receiveBlock for block " + block + 
+               " " + ioe);
+      IOUtils.closeStream(this);
+      if (responder != null) {
+        responder.interrupt();
+      }
+      throw ioe;
+    } finally {
+      if (responder != null) {
+        try {
+          responder.join();
+        } catch (InterruptedException e) {
+          throw new IOException("Interrupted receiveBlock");
+        }
+        responder = null;
+      }
+    }
+  }
+
+  /**
+   * Sets the file pointer in the local block file to the specified value.
+   */
+  private void setBlockPosition(long offsetInBlock) throws IOException {
+    if (finalized) {
+      if (!isRecovery) {
+        throw new IOException("Write to offset " + offsetInBlock +
+                              " of block " + block +
+                              " that is already finalized.");
+      }
+      if (offsetInBlock > datanode.data.getLength(block)) {
+        throw new IOException("Write to offset " + offsetInBlock +
+                              " of block " + block +
+                              " that is already finalized and is of size " +
+                              datanode.data.getLength(block));
+      }
+      return;
+    }
+
+    if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
+      return;                   // nothing to do 
+    }
+    long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
+                            offsetInBlock / bytesPerChecksum * checksumSize;
+    if (out != null) {
+     out.flush();
+    }
+    if (checksumOut != null) {
+      checksumOut.flush();
+    }
+
+    // If this is a partial chunk, then read in pre-existing checksum
+    if (offsetInBlock % bytesPerChecksum != 0) {
+      LOG.info("setBlockPosition trying to set position to " +
+               offsetInBlock +
+               " for block " + block +
+               " which is not a multiple of bytesPerChecksum " +
+               bytesPerChecksum);
+      computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
+    }
+
+    LOG.info("Changing block file offset of block " + block + " from " + 
+        datanode.data.getChannelPosition(block, streams) +
+             " to " + offsetInBlock +
+             " meta file offset to " + offsetInChecksum);
+
+    // set the position of the block file
+    datanode.data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
+  }
+
+  /**
+   * reads in the partial crc chunk and computes checksum
+   * of pre-existing data in partial chunk.
+   */
+  private void computePartialChunkCrc(long blkoff, long ckoff, 
+                                      int bytesPerChecksum) throws IOException {
+
+    // find offset of the beginning of partial chunk.
+    //
+    int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
+    int checksumSize = checksum.getChecksumSize();
+    blkoff = blkoff - sizePartialChunk;
+    LOG.info("computePartialChunkCrc sizePartialChunk " + 
+              sizePartialChunk +
+              " block " + block +
+              " offset in block " + blkoff +
+              " offset in metafile " + ckoff);
+
+    // create an input stream from the block file
+    // and read in partial crc chunk into temporary buffer
+    //
+    byte[] buf = new byte[sizePartialChunk];
+    byte[] crcbuf = new byte[checksumSize];
+    FSDataset.BlockInputStreams instr = null;
+    try { 
+      instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
+      IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
+
+      // open meta file and read in crc value computer earlier
+      IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
+    } finally {
+      IOUtils.closeStream(instr);
+    }
+
+    // compute crc of partial chunk from data read in the block file.
+    partialCrc = new CRC32();
+    partialCrc.update(buf, 0, sizePartialChunk);
+    LOG.info("Read in partial CRC chunk from disk for block " + block);
+
+    // paranoia! verify that the pre-computed crc matches what we
+    // recalculated just now
+    if (partialCrc.getValue() != FSInputChecker.checksum2long(crcbuf)) {
+      String msg = "Partial CRC " + partialCrc.getValue() +
+                   " does not match value computed the " +
+                   " last time file was closed " +
+                   FSInputChecker.checksum2long(crcbuf);
+      throw new IOException(msg);
+    }
+    //LOG.debug("Partial CRC matches 0x" + 
+    //            Long.toHexString(partialCrc.getValue()));
+  }
+  
+  
+  /**
+   * Processed responses from downstream datanodes in the pipeline
+   * and sends back replies to the originator.
+   */
+  class PacketResponder implements Runnable, FSConstants {   
+
+    //packet waiting for ack
+    private LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
+    private volatile boolean running = true;
+    private Block block;
+    DataInputStream mirrorIn;   // input from downstream datanode
+    DataOutputStream replyOut;  // output to upstream datanode
+    private int numTargets;     // number of downstream datanodes including myself
+    private String clientName;  // The name of the client (if any)
+    private BlockReceiver receiver; // The owner of this responder.
+
+    public String toString() {
+      return "PacketResponder " + numTargets + " for Block " + this.block;
+    }
+
+    PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
+                    DataOutputStream out, int numTargets, String clientName) {
+      this.receiver = receiver;
+      this.block = b;
+      mirrorIn = in;
+      replyOut = out;
+      this.numTargets = numTargets;
+      this.clientName = clientName;
+    }
+
+    /**
+     * enqueue the seqno that is still be to acked by the downstream datanode.
+     * @param seqno
+     * @param lastPacketInBlock
+     */
+    synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
+      if (running) {
+        LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
+                  " to ack queue.");
+        ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
+        notifyAll();
+      }
+    }
+
+    /**
+     * wait for all pending packets to be acked. Then shutdown thread.
+     */
+    synchronized void close() {
+      while (running && ackQueue.size() != 0 && datanode.shouldRun) {
+        try {
+          wait();
+        } catch (InterruptedException e) {
+          running = false;
+        }
+      }
+      LOG.debug("PacketResponder " + numTargets +
+               " for block " + block + " Closing down.");
+      running = false;
+      notifyAll();
+    }
+
+    private synchronized void lastDataNodeRun() {
+      long lastHeartbeat = System.currentTimeMillis();
+      boolean lastPacket = false;
+
+      while (running && datanode.shouldRun && !lastPacket) {
+        long now = System.currentTimeMillis();
+        try {
+
+            // wait for a packet to be sent to downstream datanode
+            while (running && datanode.shouldRun && ackQueue.size() == 0) {
+              long idle = now - lastHeartbeat;
+              long timeout = (datanode.socketTimeout/2) - idle;
+              if (timeout <= 0) {
+                timeout = 1000;
+              }
+              try {
+                wait(timeout);
+              } catch (InterruptedException e) {
+                if (running) {
+                  LOG.info("PacketResponder " + numTargets +
+                           " for block " + block + " Interrupted.");
+                  running = false;
+                }
+                break;
+              }
+          
+              // send a heartbeat if it is time.
+              now = System.currentTimeMillis();
+              if (now - lastHeartbeat > datanode.socketTimeout/2) {
+                replyOut.writeLong(-1); // send heartbeat
+                replyOut.flush();
+                lastHeartbeat = now;
+              }
+            }
+
+            if (!running || !datanode.shouldRun) {
+              break;
+            }
+            Packet pkt = ackQueue.removeFirst();
+            long expected = pkt.seqno;
+            notifyAll();
+            LOG.debug("PacketResponder " + numTargets +
+                      " for block " + block + 
+                      " acking for packet " + expected);
+
+            // If this is the last packet in block, then close block
+            // file and finalize the block before responding success
+            if (pkt.lastPacketInBlock) {
+              if (!receiver.finalized) {
+                receiver.close();
+                block.setNumBytes(receiver.offsetInBlock);
+                datanode.data.finalizeBlock(block);
+                datanode.myMetrics.blocksWritten.inc();
+                datanode.notifyNamenodeReceivedBlock(block, 
+                    DataNode.EMPTY_DEL_HINT);
+                LOG.info("Received block " + block + 
+                         " of size " + block.getNumBytes() + 
+                         " from " + receiver.inAddr);
+              }
+              lastPacket = true;
+            }
+
+            replyOut.writeLong(expected);
+            replyOut.writeShort(OP_STATUS_SUCCESS);
+            replyOut.flush();
+        } catch (Exception e) {
+          if (running) {
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
+            running = false;
+          }
+        }
+      }
+      LOG.info("PacketResponder " + numTargets + 
+               " for block " + block + " terminating");
+    }
+
+    /**
+     * Thread to process incoming acks.
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+
+      // If this is the last datanode in pipeline, then handle differently
+      if (numTargets == 0) {
+        lastDataNodeRun();
+        return;
+      }
+
+      boolean lastPacketInBlock = false;
+      while (running && datanode.shouldRun && !lastPacketInBlock) {
+
+        try {
+            short op = OP_STATUS_SUCCESS;
+            boolean didRead = false;
+            long expected = -2;
+            try { 
+              // read seqno from downstream datanode
+              long seqno = mirrorIn.readLong();
+              didRead = true;
+              if (seqno == -1) {
+                replyOut.writeLong(-1); // send keepalive
+                replyOut.flush();
+                LOG.debug("PacketResponder " + numTargets + " got -1");
+                continue;
+              } else if (seqno == -2) {
+                LOG.debug("PacketResponder " + numTargets + " got -2");
+              } else {
+                LOG.debug("PacketResponder " + numTargets + " got seqno = " + 
+                    seqno);
+                Packet pkt = null;
+                synchronized (this) {
+                  while (running && datanode.shouldRun && ackQueue.size() == 0) {
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug("PacketResponder " + numTargets + 
+                                " seqno = " + seqno +
+                                " for block " + block +
+                                " waiting for local datanode to finish write.");
+                    }
+                    wait();
+                  }
+                  pkt = ackQueue.removeFirst();
+                  expected = pkt.seqno;
+                  notifyAll();
+                  LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
+                  if (seqno != expected) {
+                    throw new IOException("PacketResponder " + numTargets +
+                                          " for block " + block +
+                                          " expected seqno:" + expected +
+                                          " received:" + seqno);
+                  }
+                  lastPacketInBlock = pkt.lastPacketInBlock;
+                }
+              }
+            } catch (Throwable e) {
+              if (running) {
+                LOG.info("PacketResponder " + block + " " + numTargets + 
+                         " Exception " + StringUtils.stringifyException(e));
+                running = false;
+              }
+            }
+
+            if (Thread.interrupted()) {
+              /* The receiver thread cancelled this thread. 
+               * We could also check any other status updates from the 
+               * receiver thread (e.g. if it is ok to write to replyOut). 
+               * It is prudent to not send any more status back to the client
+               * because this datanode has a problem. The upstream datanode
+               * will detect a timout on heartbeats and will declare that
+               * this datanode is bad, and rightly so.
+               */
+              LOG.info("PacketResponder " + block +  " " + numTargets +
+                       " : Thread is interrupted.");
+              running = false;
+              continue;
+            }
+            
+            if (!didRead) {
+              op = OP_STATUS_ERROR;
+            }
+            
+            // If this is the last packet in block, then close block
+            // file and finalize the block before responding success
+            if (lastPacketInBlock && !receiver.finalized) {
+              receiver.close();
+              block.setNumBytes(receiver.offsetInBlock);
+              datanode.data.finalizeBlock(block);
+              datanode.myMetrics.blocksWritten.inc();
+              datanode.notifyNamenodeReceivedBlock(block, 
+                  DataNode.EMPTY_DEL_HINT);
+              LOG.info("Received block " + block + 
+                       " of size " + block.getNumBytes() + 
+                       " from " + receiver.inAddr);
+            }
+
+            // send my status back to upstream datanode
+            replyOut.writeLong(expected); // send seqno upstream
+            replyOut.writeShort(OP_STATUS_SUCCESS);
+
+            LOG.debug("PacketResponder " + numTargets + 
+                      " for block " + block +
+                      " responded my status " +
+                      " for seqno " + expected);
+
+            // forward responses from downstream datanodes.
+            for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
+              try {
+                if (op == OP_STATUS_SUCCESS) {
+                  op = mirrorIn.readShort();
+                  if (op != OP_STATUS_SUCCESS) {
+                    LOG.debug("PacketResponder for block " + block +
+                              ": error code received from downstream " +
+                              " datanode[" + i + "] " + op);
+                  }
+                }
+              } catch (Throwable e) {
+                op = OP_STATUS_ERROR;
+              }
+              replyOut.writeShort(op);
+            }
+            replyOut.flush();
+            LOG.debug("PacketResponder " + block + " " + numTargets + 
+                      " responded other status " + " for seqno " + expected);
+
+            // If we were unable to read the seqno from downstream, then stop.
+            if (expected == -2) {
+              running = false;
+            }
+            // If we forwarded an error response from a downstream datanode
+            // and we are acting on behalf of a client, then we quit. The 
+            // client will drive the recovery mechanism.
+            if (op == OP_STATUS_ERROR && clientName.length() > 0) {
+              running = false;
+            }
+        } catch (IOException e) {
+          if (running) {
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
+            running = false;
+          }
+        } catch (RuntimeException e) {
+          if (running) {
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
+            running = false;
+          }
+        }
+      }
+      LOG.info("PacketResponder " + numTargets + 
+               " for block " + block + " terminating");
+    }
+  }
+  
+  /**
+   * This information is cached by the Datanode in the ackQueue.
+   */
+  static private class Packet {
+    long seqno;
+    boolean lastPacketInBlock;
+
+    Packet(long seqno, boolean lastPacketInBlock) {
+      this.seqno = seqno;
+      this.lastPacketInBlock = lastPacketInBlock;
+    }
+  }
+}

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=685979&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Aug 14 10:58:30 2008
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Reads a block from the disk and sends it to a recipient.
+ */
+class BlockSender implements java.io.Closeable, FSConstants {
+  public static final Log LOG = DataNode.LOG;
+  
+  private Block block; // the block to read from
+  private InputStream blockIn; // data stream
+  private long blockInPosition = -1; // updated while using transferTo().
+  private DataInputStream checksumIn; // checksum datastream
+  private DataChecksum checksum; // checksum stream
+  private long offset; // starting position to read
+  private long endOffset; // ending position
+  private long blockLength;
+  private int bytesPerChecksum; // chunk size
+  private int checksumSize; // checksum size
+  private boolean corruptChecksumOk; // if need to verify checksum
+  private boolean chunkOffsetOK; // if need to send chunk offset
+  private long seqno; // sequence number of packet
+
+  private boolean transferToAllowed = true;
+  private boolean blockReadFully; //set when the whole block is read
+  private boolean verifyChecksum; //if true, check is verified while reading
+  private BlockTransferThrottler throttler;
+  
+  /**
+   * Minimum buffer used while sending data to clients. Used only if
+   * transferTo() is enabled. 64KB is not that large. It could be larger, but
+   * not sure if there will be much more improvement.
+   */
+  private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+
+  
+  BlockSender(Block block, long startOffset, long length,
+              boolean corruptChecksumOk, boolean chunkOffsetOK,
+              boolean verifyChecksum, DataNode datanode) throws IOException {
+
+    try {
+      this.block = block;
+      this.chunkOffsetOK = chunkOffsetOK;
+      this.corruptChecksumOk = corruptChecksumOk;
+      this.verifyChecksum = verifyChecksum;
+      this.blockLength = datanode.data.getLength(block);
+      this.transferToAllowed = datanode.transferToAllowed;
+
+      if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
+        checksumIn = new DataInputStream(
+                new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
+                                        BUFFER_SIZE));
+
+        // read and handle the common header here. For now just a version
+       BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+       short version = header.getVersion();
+
+        if (version != FSDataset.METADATA_VERSION) {
+          LOG.warn("Wrong version (" + version + ") for metadata file for "
+              + block + " ignoring ...");
+        }
+        checksum = header.getChecksum();
+      } else {
+        LOG.warn("Could not find metadata file for " + block);
+        // This only decides the buffer size. Use BUFFER_SIZE?
+        checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
+            16 * 1024);
+      }
+
+      /* If bytesPerChecksum is very large, then the metadata file
+       * is mostly corrupted. For now just truncate bytesPerchecksum to
+       * blockLength.
+       */        
+      bytesPerChecksum = checksum.getBytesPerChecksum();
+      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+        checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+                                   Math.max((int)blockLength, 10*1024*1024));
+        bytesPerChecksum = checksum.getBytesPerChecksum();        
+      }
+      checksumSize = checksum.getChecksumSize();
+
+      if (length < 0) {
+        length = blockLength;
+      }
+
+      endOffset = blockLength;
+      if (startOffset < 0 || startOffset > endOffset
+          || (length + startOffset) > endOffset) {
+        String msg = " Offset " + startOffset + " and length " + length
+        + " don't match block " + block + " ( blockLen " + endOffset + " )";
+        LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg);
+        throw new IOException(msg);
+      }
+
+      
+      offset = (startOffset - (startOffset % bytesPerChecksum));
+      if (length >= 0) {
+        // Make sure endOffset points to end of a checksumed chunk.
+        long tmpLen = startOffset + length + (startOffset - offset);
+        if (tmpLen % bytesPerChecksum != 0) {
+          tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+        }
+        if (tmpLen < endOffset) {
+          endOffset = tmpLen;
+        }
+      }
+
+      // seek to the right offsets
+      if (offset > 0) {
+        long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+        // note blockInStream is  seeked when created below
+        if (checksumSkip > 0) {
+          // Should we use seek() for checksum file as well?
+          IOUtils.skipFully(checksumIn, checksumSkip);
+        }
+      }
+      seqno = 0;
+
+      blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+    } catch (IOException ioe) {
+      IOUtils.closeStream(this);
+      IOUtils.closeStream(blockIn);
+      throw ioe;
+    }
+  }
+
+  /**
+   * close opened files.
+   */
+  public void close() throws IOException {
+    IOException ioe = null;
+    // close checksum file
+    if(checksumIn!=null) {
+      try {
+        checksumIn.close();
+      } catch (IOException e) {
+        ioe = e;
+      }
+      checksumIn = null;
+    }
+    // close data file
+    if(blockIn!=null) {
+      try {
+        blockIn.close();
+      } catch (IOException e) {
+        ioe = e;
+      }
+      blockIn = null;
+    }
+    // throw IOException if there is any
+    if(ioe!= null) {
+      throw ioe;
+    }
+  }
+
+  /**
+   * Sends upto maxChunks chunks of data.
+   * 
+   * When blockInPosition is >= 0, assumes 'out' is a 
+   * {@link SocketOutputStream} and tries 
+   * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
+   * send data (and updates blockInPosition).
+   */
+  private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
+                         throws IOException {
+    // Sends multiple chunks in one packet with a single write().
+
+    int len = Math.min((int) (endOffset - offset),
+                       bytesPerChecksum*maxChunks);
+    if (len == 0) {
+      return 0;
+    }
+
+    int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
+    int packetLen = len + numChunks*checksumSize + 4;
+    pkt.clear();
+    
+    // write packet header
+    pkt.putInt(packetLen);
+    pkt.putLong(offset);
+    pkt.putLong(seqno);
+    pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+               //why no ByteBuf.putBoolean()?
+    pkt.putInt(len);
+    
+    int checksumOff = pkt.position();
+    int checksumLen = numChunks * checksumSize;
+    byte[] buf = pkt.array();
+    
+    if (checksumSize > 0 && checksumIn != null) {
+      try {
+        checksumIn.readFully(buf, checksumOff, checksumLen);
+      } catch (IOException e) {
+        LOG.warn(" Could not read or failed to veirfy checksum for data" +
+                 " at offset " + offset + " for block " + block + " got : "
+                 + StringUtils.stringifyException(e));
+        IOUtils.closeStream(checksumIn);
+        checksumIn = null;
+        if (corruptChecksumOk) {
+          // Just fill the array with zeros.
+          Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
+        } else {
+          throw e;
+        }
+      }
+    }
+    
+    int dataOff = checksumOff + checksumLen;
+    
+    if (blockInPosition < 0) {
+      //normal transfer
+      IOUtils.readFully(blockIn, buf, dataOff, len);
+
+      if (verifyChecksum) {
+        int dOff = dataOff;
+        int cOff = checksumOff;
+        int dLeft = len;
+
+        for (int i=0; i<numChunks; i++) {
+          checksum.reset();
+          int dLen = Math.min(dLeft, bytesPerChecksum);
+          checksum.update(buf, dOff, dLen);
+          if (!checksum.compare(buf, cOff)) {
+            throw new ChecksumException("Checksum failed at " + 
+                                        (offset + len - dLeft), len);
+          }
+          dLeft -= dLen;
+          dOff += dLen;
+          cOff += checksumSize;
+        }
+      }
+      //writing is done below (mainly to handle IOException)
+    }
+    
+    try {
+      if (blockInPosition >= 0) {
+        //use transferTo(). Checks on out and blockIn are already done. 
+
+        SocketOutputStream sockOut = (SocketOutputStream)out;
+        //first write the packet
+        sockOut.write(buf, 0, dataOff);
+        // no need to flush. since we know out is not a buffered stream. 
+
+        sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
+                                blockInPosition, len);
+
+        blockInPosition += len;
+      } else {
+        // normal transfer
+        out.write(buf, 0, dataOff + len);
+      }
+      
+    } catch (IOException e) {
+      /* exception while writing to the client (well, with transferTo(),
+       * it could also be while reading from the local file). Many times 
+       * this error can be ignored. We will let the callers distinguish this 
+       * from other exceptions if this is not a subclass of IOException. 
+       */
+      if (e.getClass().equals(IOException.class)) {
+        // "se" could be a new class in stead of SocketException.
+        IOException se = new SocketException("Original Exception : " + e);
+        se.initCause(e);
+        /* Cange the stacktrace so that original trace is not truncated
+         * when printed.*/ 
+        se.setStackTrace(e.getStackTrace());
+        throw se;
+      }
+      throw e;
+    }
+
+    if (throttler != null) { // rebalancing so throttle
+      throttler.throttle(packetLen);
+    }
+
+    return len;
+  }
+
+  /**
+   * sendBlock() is used to read block and its metadata and stream the data to
+   * either a client or to another datanode. 
+   * 
+   * @param out  stream to which the block is written to
+   * @param baseStream optional. if non-null, <code>out</code> is assumed to 
+   *        be a wrapper over this stream. This enables optimizations for
+   *        sending the data, e.g. 
+   *        {@link SocketOutputStream#transferToFully(FileChannel, 
+   *        long, int)}.
+   * @param throttler for sending data.
+   * @return total bytes reads, including crc.
+   */
+  long sendBlock(DataOutputStream out, OutputStream baseStream, 
+                 BlockTransferThrottler throttler) throws IOException {
+    if( out == null ) {
+      throw new IOException( "out stream is null" );
+    }
+    this.throttler = throttler;
+
+    long initialOffset = offset;
+    long totalRead = 0;
+    OutputStream streamForSendChunks = out;
+    
+    try {
+      checksum.writeHeader(out);
+      if ( chunkOffsetOK ) {
+        out.writeLong( offset );
+      }
+      out.flush();
+      
+      int maxChunksPerPacket;
+      int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+      
+      if (transferToAllowed && !verifyChecksum && 
+          baseStream instanceof SocketOutputStream && 
+          blockIn instanceof FileInputStream) {
+        
+        FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
+        
+        // blockInPosition also indicates sendChunks() uses transferTo.
+        blockInPosition = fileChannel.position();
+        streamForSendChunks = baseStream;
+        
+        // assure a mininum buffer size.
+        maxChunksPerPacket = (Math.max(BUFFER_SIZE, 
+                                       MIN_BUFFER_WITH_TRANSFERTO)
+                              + bytesPerChecksum - 1)/bytesPerChecksum;
+        
+        // allocate smaller buffer while using transferTo(). 
+        pktSize += checksumSize * maxChunksPerPacket;
+      } else {
+        maxChunksPerPacket = Math.max(1,
+                 (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+        pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+      }
+
+      ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
+
+      while (endOffset > offset) {
+        long len = sendChunks(pktBuf, maxChunksPerPacket, 
+                              streamForSendChunks);
+        offset += len;
+        totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
+                            checksumSize);
+        seqno++;
+      }
+      out.writeInt(0); // mark the end of block        
+      out.flush();
+    } finally {
+      close();
+    }
+
+    blockReadFully = (initialOffset == 0 && offset >= blockLength);
+
+    return totalRead;
+  }
+  
+  boolean isBlockReadFully() {
+    return blockReadFully;
+  }
+}

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java?rev=685979&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java Thu Aug 14 10:58:30 2008
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+/** 
+ * a class to throttle the block transfers.
+ * This class is thread safe. It can be shared by multiple threads.
+ * The parameter bandwidthPerSec specifies the total bandwidth shared by
+ * threads.
+ */
+class BlockTransferThrottler {
+  private long period;          // period over which bw is imposed
+  private long periodExtension; // Max period over which bw accumulates.
+  private long bytesPerPeriod; // total number of bytes can be sent in each period
+  private long curPeriodStart; // current period starting time
+  private long curReserve;     // remaining bytes can be sent in the period
+  private long bytesAlreadyUsed;
+
+  /** Constructor 
+   * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+   */
+  BlockTransferThrottler(long bandwidthPerSec) {
+    this(500, bandwidthPerSec);  // by default throttling period is 500ms 
+  }
+
+  /**
+   * Constructor
+   * @param period in milliseconds. Bandwidth is enforced over this
+   *        period.
+   * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+   */
+  BlockTransferThrottler(long period, long bandwidthPerSec) {
+    this.curPeriodStart = System.currentTimeMillis();
+    this.period = period;
+    this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
+    this.periodExtension = period*3;
+  }
+
+  /**
+   * @return current throttle bandwidth in bytes per second.
+   */
+  synchronized long getBandwidth() {
+    return bytesPerPeriod*1000/period;
+  }
+  
+  /**
+   * Sets throttle bandwidth. This takes affect latest by the end of current
+   * period.
+   * 
+   * @param bytesPerSecond 
+   */
+  synchronized void setBandwidth(long bytesPerSecond) {
+    if ( bytesPerSecond <= 0 ) {
+      throw new IllegalArgumentException("" + bytesPerSecond);
+    }
+    bytesPerPeriod = bytesPerSecond*period/1000;
+  }
+  
+  /** Given the numOfBytes sent/received since last time throttle was called,
+   * make the current thread sleep if I/O rate is too fast
+   * compared to the given bandwidth.
+   *
+   * @param numOfBytes
+   *     number of bytes sent/received since last time throttle was called
+   */
+  synchronized void throttle(long numOfBytes) {
+    if ( numOfBytes <= 0 ) {
+      return;
+    }
+
+    curReserve -= numOfBytes;
+    bytesAlreadyUsed += numOfBytes;
+
+    while (curReserve <= 0) {
+      long now = System.currentTimeMillis();
+      long curPeriodEnd = curPeriodStart + period;
+
+      if ( now < curPeriodEnd ) {
+        // Wait for next period so that curReserve can be increased.
+        try {
+          wait( curPeriodEnd - now );
+        } catch (InterruptedException ignored) {}
+      } else if ( now <  (curPeriodStart + periodExtension)) {
+        curPeriodStart = curPeriodEnd;
+        curReserve += bytesPerPeriod;
+      } else {
+        // discard the prev period. Throttler might not have
+        // been used for a long time.
+        curPeriodStart = now;
+        curReserve = bytesPerPeriod - bytesAlreadyUsed;
+      }
+    }
+
+    bytesAlreadyUsed -= numOfBytes;
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=685979&r1=685978&r2=685979&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Thu Aug 14 10:58:30 2008
@@ -49,7 +49,6 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockSender;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -95,7 +94,7 @@
   
   Random random = new Random();
   
-  DataNode.Throttler throttler = null;
+  BlockTransferThrottler throttler = null;
   
   private static enum ScanType {
     REMOTE_READ,           // Verified when a block read by a client etc
@@ -239,7 +238,7 @@
     }
     
     synchronized (this) {
-      throttler = new DataNode.Throttler(200, MAX_SCAN_RATE);
+      throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
     }
   }
 
@@ -424,8 +423,8 @@
       try {
         adjustThrottler();
         
-        blockSender = datanode.new BlockSender(block, 0, -1, false, 
-                                               false, true);
+        blockSender = new BlockSender(block, 0, -1, false, 
+                                               false, true, datanode);
 
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());



Mime
View raw message