hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r906690 [3/4] - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/aop/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/...
Date Thu, 04 Feb 2010 22:20:03 GMT
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=906690&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Feb  4 22:20:02 2010
@@ -0,0 +1,832 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/****************************************************************
+ * DFSInputStream provides bytes from a named file.  It handles 
+ * negotiation of the namenode and various datanodes as necessary.
+ ****************************************************************/
+class DFSInputStream extends FSInputStream {
+  private final DFSClient dfsClient;
+  private Socket s = null;
+  private boolean closed = false;
+
+  private final String src;
+  private long prefetchSize;
+  private BlockReader blockReader = null;
+  private boolean verifyChecksum;
+  private LocatedBlocks locatedBlocks = null;
+  private long lastBlockBeingWrittenLength = 0;
+  private DatanodeInfo currentNode = null;
+  private Block currentBlock = null;
+  private long pos = 0;
+  private long blockEnd = -1;
+
+  /**
+   * This variable tracks the number of failures since the start of the
+   * most recent user-facing operation. That is to say, it should be reset
+   * whenever the user makes a call on this stream, and if at any point
+   * during the retry logic, the failure count exceeds a threshold,
+   * the errors will be thrown back to the operation.
+   *
+   * Specifically this counts the number of times the client has gone
+   * back to the namenode to get a new list of block locations, and is
+   * capped at maxBlockAcquireFailures
+   */
+  private int failures = 0;
+  private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught
+
+  /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
+   * parallel accesses to DFSInputStream (through ptreads) properly */
+  private ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = 
+             new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
+  private int buffersize = 1;
+  
+  private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
+  
+  void addToDeadNodes(DatanodeInfo dnInfo) {
+    deadNodes.put(dnInfo, dnInfo);
+  }
+  
+  DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum
+                 ) throws IOException {
+    this.dfsClient = dfsClient;
+    this.verifyChecksum = verifyChecksum;
+    this.buffersize = buffersize;
+    this.src = src;
+    prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
+        10 * dfsClient.defaultBlockSize);
+    timeWindow = this.dfsClient.conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
+    openInfo();
+  }
+
+  /**
+   * Grab the open-file info from namenode
+   */
+  synchronized void openInfo() throws IOException {
+    LocatedBlocks newInfo = DFSClient.callGetBlockLocations(dfsClient.namenode, src, 0, prefetchSize);
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("newInfo = " + newInfo);
+    }
+    if (newInfo == null) {
+      throw new IOException("Cannot open filename " + src);
+    }
+
+    if (locatedBlocks != null) {
+      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
+      Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
+      while (oldIter.hasNext() && newIter.hasNext()) {
+        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
+          throw new IOException("Blocklist for " + src + " has changed!");
+        }
+      }
+    }
+    locatedBlocks = newInfo;
+    lastBlockBeingWrittenLength = 0;
+    if (!locatedBlocks.isLastBlockComplete()) {
+      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
+      if (last != null) {
+        final long len = readBlockLength(last);
+        last.getBlock().setNumBytes(len);
+        lastBlockBeingWrittenLength = len; 
+      }
+    }
+
+    currentNode = null;
+  }
+
+  /** Read the block length from one of the datanodes. */
+  private long readBlockLength(LocatedBlock locatedblock) throws IOException {
+    if (locatedblock == null || locatedblock.getLocations().length == 0) {
+      return 0;
+    }
+    for(DatanodeInfo datanode : locatedblock.getLocations()) {
+      try {
+        final ClientDatanodeProtocol cdp = DFSClient.createClientDatanodeProtocolProxy(
+            datanode, dfsClient.conf);
+        final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
+        if (n >= 0) {
+          return n;
+        }
+      }
+      catch(IOException ioe) {
+        if (DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug("Faild to getReplicaVisibleLength from datanode "
+              + datanode + " for block " + locatedblock.getBlock(), ioe);
+        }
+      }
+    }
+    throw new IOException("Cannot obtain block length for " + locatedblock);
+  }
+  
+  public synchronized long getFileLength() {
+    return locatedBlocks == null? 0:
+        locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
+  }
+
+  /**
+   * Returns the datanode from which the stream is currently reading.
+   */
+  public DatanodeInfo getCurrentDatanode() {
+    return currentNode;
+  }
+
+  /**
+   * Returns the block containing the target position. 
+   */
+  public Block getCurrentBlock() {
+    return currentBlock;
+  }
+
+  /**
+   * Return collection of blocks that has already been located.
+   */
+  synchronized List<LocatedBlock> getAllBlocks() throws IOException {
+    return getBlockRange(0, getFileLength());
+  }
+
+  /**
+   * Get block at the specified position.
+   * Fetch it from the namenode if not cached.
+   * 
+   * @param offset
+   * @param updatePosition whether to update current position
+   * @return located block
+   * @throws IOException
+   */
+  private synchronized LocatedBlock getBlockAt(long offset,
+      boolean updatePosition) throws IOException {
+    assert (locatedBlocks != null) : "locatedBlocks is null";
+
+    final LocatedBlock blk;
+
+    //check offset
+    if (offset < 0 || offset >= getFileLength()) {
+      throw new IOException("offset < 0 || offset > getFileLength(), offset="
+          + offset
+          + ", updatePosition=" + updatePosition
+          + ", locatedBlocks=" + locatedBlocks);
+    }
+    else if (offset >= locatedBlocks.getFileLength()) {
+      // offset to the portion of the last block,
+      // which is not known to the name-node yet;
+      // getting the last block 
+      blk = locatedBlocks.getLastLocatedBlock();
+    }
+    else {
+      // search cached blocks first
+      int targetBlockIdx = locatedBlocks.findBlock(offset);
+      if (targetBlockIdx < 0) { // block is not cached
+        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+        // fetch more blocks
+        LocatedBlocks newBlocks;
+        newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, offset, prefetchSize);
+        assert (newBlocks != null) : "Could not find target position " + offset;
+        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+      }
+      blk = locatedBlocks.get(targetBlockIdx);
+    }
+
+    // update current position
+    if (updatePosition) {
+      pos = offset;
+      blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
+      currentBlock = blk.getBlock();
+    }
+    return blk;
+  }
+
+  /** Fetch a block from namenode and cache it */
+  private synchronized void fetchBlockAt(long offset) throws IOException {
+    int targetBlockIdx = locatedBlocks.findBlock(offset);
+    if (targetBlockIdx < 0) { // block is not cached
+      targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+    }
+    // fetch blocks
+    LocatedBlocks newBlocks;
+    newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, offset, prefetchSize);
+    if (newBlocks == null) {
+      throw new IOException("Could not find target position " + offset);
+    }
+    locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+  }
+
+  /**
+   * Get blocks in the specified range.
+   * Fetch them from the namenode if not cached.
+   * 
+   * @param offset
+   * @param length
+   * @return consequent segment of located blocks
+   * @throws IOException
+   */
+  private synchronized List<LocatedBlock> getBlockRange(long offset, 
+                                                        long length) 
+                                                      throws IOException {
+    final List<LocatedBlock> blocks;
+    if (locatedBlocks.isLastBlockComplete()) {
+      blocks = getFinalizedBlockRange(offset, length);
+    }
+    else {
+      if (length + offset > locatedBlocks.getFileLength()) {
+        length = locatedBlocks.getFileLength() - offset;
+      }
+      blocks = getFinalizedBlockRange(offset, length);
+      blocks.add(locatedBlocks.getLastLocatedBlock());
+    }
+    return blocks;
+  }
+
+  /**
+   * Get blocks in the specified range.
+   * Includes only the complete blocks.
+   * Fetch them from the namenode if not cached.
+   */
+  private synchronized List<LocatedBlock> getFinalizedBlockRange(
+      long offset, long length) throws IOException {
+    assert (locatedBlocks != null) : "locatedBlocks is null";
+    List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
+    // search cached blocks first
+    int blockIdx = locatedBlocks.findBlock(offset);
+    if (blockIdx < 0) { // block is not cached
+      blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
+    }
+    long remaining = length;
+    long curOff = offset;
+    while(remaining > 0) {
+      LocatedBlock blk = null;
+      if(blockIdx < locatedBlocks.locatedBlockCount())
+        blk = locatedBlocks.get(blockIdx);
+      if (blk == null || curOff < blk.getStartOffset()) {
+        LocatedBlocks newBlocks;
+        newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, curOff, remaining);
+        locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
+        continue;
+      }
+      assert curOff >= blk.getStartOffset() : "Block not found";
+      blockRange.add(blk);
+      long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
+      remaining -= bytesRead;
+      curOff += bytesRead;
+      blockIdx++;
+    }
+    return blockRange;
+  }
+
+  /**
+   * Open a DataInputStream to a DataNode so that it can be read from.
+   * We get block ID and the IDs of the destinations at startup, from the namenode.
+   */
+  private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
+    if (target >= getFileLength()) {
+      throw new IOException("Attempted to read past end of file");
+    }
+
+    if ( blockReader != null ) {
+      blockReader.close(); 
+      blockReader = null;
+    }
+    
+    if (s != null) {
+      s.close();
+      s = null;
+    }
+
+    //
+    // Connect to best DataNode for desired Block, with potential offset
+    //
+    DatanodeInfo chosenNode = null;
+    int refetchToken = 1; // only need to get a new access token once
+    
+    while (true) {
+      //
+      // Compute desired block
+      //
+      LocatedBlock targetBlock = getBlockAt(target, true);
+      assert (target==pos) : "Wrong postion " + pos + " expect " + target;
+      long offsetIntoBlock = target - targetBlock.getStartOffset();
+
+      DNAddrPair retval = chooseDataNode(targetBlock);
+      chosenNode = retval.info;
+      InetSocketAddress targetAddr = retval.addr;
+
+      try {
+        s = dfsClient.socketFactory.createSocket();
+        NetUtils.connect(s, targetAddr, dfsClient.socketTimeout);
+        s.setSoTimeout(dfsClient.socketTimeout);
+        Block blk = targetBlock.getBlock();
+        BlockAccessToken accessToken = targetBlock.getAccessToken();
+        
+        blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
+            accessToken, 
+            blk.getGenerationStamp(),
+            offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
+            buffersize, verifyChecksum, dfsClient.clientName);
+        return chosenNode;
+      } catch (IOException ex) {
+        if (ex instanceof InvalidAccessTokenException && refetchToken > 0) {
+          DFSClient.LOG.info("Will fetch a new access token and retry, " 
+              + "access token was invalid when connecting to " + targetAddr
+              + " : " + ex);
+          /*
+           * Get a new access token and retry. Retry is needed in 2 cases. 1)
+           * When both NN and DN re-started while DFSClient holding a cached
+           * access token. 2) In the case that NN fails to update its
+           * access key at pre-set interval (by a wide margin) and
+           * subsequently restarts. In this case, DN re-registers itself with
+           * NN and receives a new access key, but DN will delete the old
+           * access key from its memory since it's considered expired based on
+           * the estimated expiration date.
+           */
+          refetchToken--;
+          fetchBlockAt(target);
+        } else {
+          DFSClient.LOG.info("Failed to connect to " + targetAddr
+              + ", add to deadNodes and continue", ex);
+          // Put chosen node into dead list, continue
+          addToDeadNodes(chosenNode);
+        }
+        if (s != null) {
+          try {
+            s.close();
+          } catch (IOException iex) {
+          }                        
+        }
+        s = null;
+      }
+    }
+  }
+
+  /**
+   * Close it down!
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    dfsClient.checkOpen();
+    
+    if ( blockReader != null ) {
+      blockReader.close();
+      blockReader = null;
+    }
+    
+    if (s != null) {
+      s.close();
+      s = null;
+    }
+    super.close();
+    closed = true;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    int ret = read( oneByteBuf, 0, 1 );
+    return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
+  }
+
+  /* This is a used by regular read() and handles ChecksumExceptions.
+   * name readBuffer() is chosen to imply similarity to readBuffer() in
+   * ChecksuFileSystem
+   */ 
+  private synchronized int readBuffer(byte buf[], int off, int len) 
+                                                  throws IOException {
+    IOException ioe;
+    
+    /* we retry current node only once. So this is set to true only here.
+     * Intention is to handle one common case of an error that is not a
+     * failure on datanode or client : when DataNode closes the connection
+     * since client is idle. If there are other cases of "non-errors" then
+     * then a datanode might be retried by setting this to true again.
+     */
+    boolean retryCurrentNode = true;
+
+    while (true) {
+      // retry as many times as seekToNewSource allows.
+      try {
+        return blockReader.read(buf, off, len);
+      } catch ( ChecksumException ce ) {
+        DFSClient.LOG.warn("Found Checksum error for " + currentBlock + " from " +
+                 currentNode.getName() + " at " + ce.getPos());          
+        dfsClient.reportChecksumFailure(src, currentBlock, currentNode);
+        ioe = ce;
+        retryCurrentNode = false;
+      } catch ( IOException e ) {
+        if (!retryCurrentNode) {
+          DFSClient.LOG.warn("Exception while reading from " + currentBlock +
+                   " of " + src + " from " + currentNode + ": " +
+                   StringUtils.stringifyException(e));
+        }
+        ioe = e;
+      }
+      boolean sourceFound = false;
+      if (retryCurrentNode) {
+        /* possibly retry the same node so that transient errors don't
+         * result in application level failures (e.g. Datanode could have
+         * closed the connection because the client is idle for too long).
+         */ 
+        sourceFound = seekToBlockSource(pos);
+      } else {
+        addToDeadNodes(currentNode);
+        sourceFound = seekToNewSource(pos);
+      }
+      if (!sourceFound) {
+        throw ioe;
+      }
+      retryCurrentNode = false;
+    }
+  }
+
+  /**
+   * Read the entire buffer.
+   */
+  @Override
+  public synchronized int read(byte buf[], int off, int len) throws IOException {
+    dfsClient.checkOpen();
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    failures = 0;
+    if (pos < getFileLength()) {
+      int retries = 2;
+      while (retries > 0) {
+        try {
+          if (pos > blockEnd) {
+            currentNode = blockSeekTo(pos);
+          }
+          int realLen = Math.min(len, (int) (blockEnd - pos + 1));
+          int result = readBuffer(buf, off, realLen);
+          
+          if (result >= 0) {
+            pos += result;
+          } else {
+            // got a EOS from reader though we expect more data on it.
+            throw new IOException("Unexpected EOS from the reader");
+          }
+          if (dfsClient.stats != null && result != -1) {
+            dfsClient.stats.incrementBytesRead(result);
+          }
+          return result;
+        } catch (ChecksumException ce) {
+          throw ce;            
+        } catch (IOException e) {
+          if (retries == 1) {
+            DFSClient.LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
+          }
+          blockEnd = -1;
+          if (currentNode != null) { addToDeadNodes(currentNode); }
+          if (--retries == 0) {
+            throw e;
+          }
+        }
+      }
+    }
+    return -1;
+  }
+
+      
+  private DNAddrPair chooseDataNode(LocatedBlock block)
+    throws IOException {
+    while (true) {
+      DatanodeInfo[] nodes = block.getLocations();
+      try {
+        DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
+        InetSocketAddress targetAddr = 
+                          NetUtils.createSocketAddr(chosenNode.getName());
+        return new DNAddrPair(chosenNode, targetAddr);
+      } catch (IOException ie) {
+        String blockInfo = block.getBlock() + " file=" + src;
+        if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
+          throw new BlockMissingException(src, "Could not obtain block: " + blockInfo,
+                                          block.getStartOffset());
+        }
+        
+        if (nodes == null || nodes.length == 0) {
+          DFSClient.LOG.info("No node available for block: " + blockInfo);
+        }
+        DFSClient.LOG.info("Could not obtain block " + block.getBlock()
+            + " from any node: " + ie
+            + ". Will get new block locations from namenode and retry...");
+        try {
+          // Introducing a random factor to the wait time before another retry.
+          // The wait time is dependent on # of failures and a random factor.
+          // At the first time of getting a BlockMissingException, the wait time
+          // is a random number between 0..3000 ms. If the first retry
+          // still fails, we will wait 3000 ms grace period before the 2nd retry.
+          // Also at the second retry, the waiting window is expanded to 6000 ms
+          // alleviating the request rate from the server. Similarly the 3rd retry
+          // will wait 6000ms grace period before retry and the waiting window is
+          // expanded to 9000ms. 
+          double waitTime = timeWindow * failures +       // grace period for the last round of attempt
+            timeWindow * (failures + 1) * dfsClient.r.nextDouble(); // expanding time window for each failure
+          DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
+          Thread.sleep((long)waitTime);
+        } catch (InterruptedException iex) {
+        }
+        deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+        openInfo();
+        block = getBlockAt(block.getStartOffset(), false);
+        failures++;
+        continue;
+      }
+    }
+  } 
+      
+  private void fetchBlockByteRange(LocatedBlock block, long start,
+                                   long end, byte[] buf, int offset) throws IOException {
+    //
+    // Connect to best DataNode for desired Block, with potential offset
+    //
+    Socket dn = null;
+    int refetchToken = 1; // only need to get a new access token once
+    
+    while (true) {
+      // cached block locations may have been updated by chooseDataNode()
+      // or fetchBlockAt(). Always get the latest list of locations at the 
+      // start of the loop.
+      block = getBlockAt(block.getStartOffset(), false);
+      DNAddrPair retval = chooseDataNode(block);
+      DatanodeInfo chosenNode = retval.info;
+      InetSocketAddress targetAddr = retval.addr;
+      BlockReader reader = null;
+          
+      try {
+        dn = dfsClient.socketFactory.createSocket();
+        NetUtils.connect(dn, targetAddr, dfsClient.socketTimeout);
+        dn.setSoTimeout(dfsClient.socketTimeout);
+        BlockAccessToken accessToken = block.getAccessToken();
+            
+        int len = (int) (end - start + 1);
+            
+        reader = BlockReader.newBlockReader(dn, src, 
+                                            block.getBlock().getBlockId(),
+                                            accessToken,
+                                            block.getBlock().getGenerationStamp(),
+                                            start, len, buffersize, 
+                                            verifyChecksum, dfsClient.clientName);
+        int nread = reader.readAll(buf, offset, len);
+        if (nread != len) {
+          throw new IOException("truncated return from reader.read(): " +
+                                "excpected " + len + ", got " + nread);
+        }
+        return;
+      } catch (ChecksumException e) {
+        DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
+                 src + " at " + block.getBlock() + ":" + 
+                 e.getPos() + " from " + chosenNode.getName());
+        dfsClient.reportChecksumFailure(src, block.getBlock(), chosenNode);
+      } catch (IOException e) {
+        if (e instanceof InvalidAccessTokenException && refetchToken > 0) {
+          DFSClient.LOG.info("Will get a new access token and retry, "
+              + "access token was invalid when connecting to " + targetAddr
+              + " : " + e);
+          refetchToken--;
+          fetchBlockAt(block.getStartOffset());
+          continue;
+        } else {
+          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for file " + src
+              + " for block " + block.getBlock() + ":"
+              + StringUtils.stringifyException(e));
+        }
+      } finally {
+        IOUtils.closeStream(reader);
+        IOUtils.closeSocket(dn);
+      }
+      // Put chosen node into dead list, continue
+      addToDeadNodes(chosenNode);
+    }
+  }
+
+  /**
+   * Read bytes starting from the specified position.
+   * 
+   * @param position start read from this position
+   * @param buffer read buffer
+   * @param offset offset into buffer
+   * @param length number of bytes to read
+   * 
+   * @return actual number of bytes read
+   */
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length)
+    throws IOException {
+    // sanity checks
+    dfsClient.checkOpen();
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    failures = 0;
+    long filelen = getFileLength();
+    if ((position < 0) || (position >= filelen)) {
+      return -1;
+    }
+    int realLen = length;
+    if ((position + length) > filelen) {
+      realLen = (int)(filelen - position);
+    }
+    
+    // determine the block and byte range within the block
+    // corresponding to position and realLen
+    List<LocatedBlock> blockRange = getBlockRange(position, realLen);
+    int remaining = realLen;
+    for (LocatedBlock blk : blockRange) {
+      long targetStart = position - blk.getStartOffset();
+      long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
+      fetchBlockByteRange(blk, targetStart, 
+                          targetStart + bytesToRead - 1, buffer, offset);
+      remaining -= bytesToRead;
+      position += bytesToRead;
+      offset += bytesToRead;
+    }
+    assert remaining == 0 : "Wrong number of bytes read.";
+    if (dfsClient.stats != null) {
+      dfsClient.stats.incrementBytesRead(realLen);
+    }
+    return realLen;
+  }
+   
+  @Override
+  public long skip(long n) throws IOException {
+    if ( n > 0 ) {
+      long curPos = getPos();
+      long fileLen = getFileLength();
+      if( n+curPos > fileLen ) {
+        n = fileLen - curPos;
+      }
+      seek(curPos+n);
+      return n;
+    }
+    return n < 0 ? -1 : 0;
+  }
+
+  /**
+   * Seek to a new arbitrary location
+   */
+  @Override
+  public synchronized void seek(long targetPos) throws IOException {
+    if (targetPos > getFileLength()) {
+      throw new IOException("Cannot seek after EOF");
+    }
+    boolean done = false;
+    if (pos <= targetPos && targetPos <= blockEnd) {
+      //
+      // If this seek is to a positive position in the current
+      // block, and this piece of data might already be lying in
+      // the TCP buffer, then just eat up the intervening data.
+      //
+      int diff = (int)(targetPos - pos);
+      if (diff <= DFSClient.TCP_WINDOW_SIZE) {
+        try {
+          pos += blockReader.skip(diff);
+          if (pos == targetPos) {
+            done = true;
+          }
+        } catch (IOException e) {//make following read to retry
+          DFSClient.LOG.debug("Exception while seek to " + targetPos + " from "
+                    + currentBlock +" of " + src + " from " + currentNode + 
+                    ": " + StringUtils.stringifyException(e));
+        }
+      }
+    }
+    if (!done) {
+      pos = targetPos;
+      blockEnd = -1;
+    }
+  }
+
+  /**
+   * Same as {@link #seekToNewSource(long)} except that it does not exclude
+   * the current datanode and might connect to the same node.
+   */
+  private synchronized boolean seekToBlockSource(long targetPos)
+                                                 throws IOException {
+    currentNode = blockSeekTo(targetPos);
+    return true;
+  }
+  
+  /**
+   * Seek to given position on a node other than the current node.  If
+   * a node other than the current node is found, then returns true. 
+   * If another node could not be found, then returns false.
+   */
+  @Override
+  public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+    boolean markedDead = deadNodes.containsKey(currentNode);
+    addToDeadNodes(currentNode);
+    DatanodeInfo oldNode = currentNode;
+    DatanodeInfo newNode = blockSeekTo(targetPos);
+    if (!markedDead) {
+      /* remove it from deadNodes. blockSeekTo could have cleared 
+       * deadNodes and added currentNode again. Thats ok. */
+      deadNodes.remove(oldNode);
+    }
+    if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
+      currentNode = newNode;
+      return true;
+    } else {
+      return false;
+    }
+  }
+      
+  /**
+   */
+  @Override
+  public synchronized long getPos() throws IOException {
+    return pos;
+  }
+
+  /** Return the size of the remaining available bytes
+   * if the size is less than or equal to {@link Integer#MAX_VALUE},
+   * otherwise, return {@link Integer#MAX_VALUE}.
+   */
+  @Override
+  public synchronized int available() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    final long remaining = getFileLength() - pos;
+    return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE;
+  }
+
+  /**
+   * We definitely don't support marks
+   */
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+  @Override
+  public void mark(int readLimit) {
+  }
+  @Override
+  public void reset() throws IOException {
+    throw new IOException("Mark/reset not supported");
+  }
+
+  /**
+   * Pick the best node from which to stream the data.
+   * Entries in <i>nodes</i> are already in the priority order
+   */
+  static DatanodeInfo bestNode(DatanodeInfo nodes[], 
+                               AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes)
+                               throws IOException {
+    if (nodes != null) { 
+      for (int i = 0; i < nodes.length; i++) {
+        if (!deadNodes.containsKey(nodes[i])) {
+          return nodes[i];
+        }
+      }
+    }
+    throw new IOException("No live nodes contain current block");
+  }
+
+  /** Utility class to encapsulate data node info and its ip address. */
+  static class DNAddrPair {
+    DatanodeInfo info;
+    InetSocketAddress addr;
+    DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
+      this.info = info;
+      this.addr = addr;
+    }
+  }
+
+}
\ No newline at end of file

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=906690&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Feb  4 22:20:02 2010
@@ -0,0 +1,1442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.StringUtils;
+
+/****************************************************************
+ * DFSOutputStream creates files from a stream of bytes.
+ *
+ * The client application writes data that is cached internally by
+ * this stream. Data is broken up into packets, each packet is
+ * typically 64K in size. A packet comprises of chunks. Each chunk
+ * is typically 512 bytes and has an associated checksum with it.
+ *
+ * When a client application fills up the currentPacket, it is
+ * enqueued into dataQueue.  The DataStreamer thread picks up
+ * packets from the dataQueue, sends it to the first datanode in
+ * the pipeline and moves it from the dataQueue to the ackQueue.
+ * The ResponseProcessor receives acks from the datanodes. When an
+ * successful ack for a packet is received from all datanodes, the
+ * ResponseProcessor removes the corresponding packet from the
+ * ackQueue.
+ *
+ * In case of error, all outstanding packets and moved from
+ * ackQueue. A new pipeline is setup by eliminating the bad
+ * datanode from the original pipeline. The DataStreamer now
+ * starts sending packets from the dataQueue.
+****************************************************************/
+class DFSOutputStream extends FSOutputSummer implements Syncable {
+  /**
+   * 
+   */
+  private final DFSClient dfsClient;
+  private Configuration conf;
+  private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
+  private Socket s;
+  // closed is accessed by different threads under different locks.
+  private volatile boolean closed = false;
+
+  private String src;
+  private final long blockSize;
+  private final DataChecksum checksum;
+  // both dataQueue and ackQueue are protected by dataQueue lock
+  private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
+  private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
+  private Packet currentPacket = null;
+  private DataStreamer streamer;
+  private long currentSeqno = 0;
+  private long bytesCurBlock = 0; // bytes writen in current block
+  private int packetSize = 0; // write packet size, including the header.
+  private int chunksPerPacket = 0;
+  private volatile IOException lastException = null;
+  private long artificialSlowdown = 0;
+  private long lastFlushOffset = -1; // offset when flush was invoked
+  //persist blocks on namenode
+  private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
+  private volatile boolean appendChunk = false;   // appending to existing partial block
+  private long initialFileSize = 0; // at time of file open
+  private Progressable progress;
+  
+  private class Packet {
+    ByteBuffer buffer;           // only one of buf and buffer is non-null
+    byte[]  buf;  
+    long    seqno;               // sequencenumber of buffer in block
+    long    offsetInBlock;       // offset in block
+    boolean lastPacketInBlock;   // is this the last packet in block?
+    int     numChunks;           // number of chunks currently in packet
+    int     maxChunks;           // max chunks in packet
+    int     dataStart;
+    int     dataPos;
+    int     checksumStart;
+    int     checksumPos;      
+    private static final long HEART_BEAT_SEQNO = -1L;
+
+    /**
+     *  create a heartbeat packet
+     */
+    Packet() {
+      this.lastPacketInBlock = false;
+      this.numChunks = 0;
+      this.offsetInBlock = 0;
+      this.seqno = HEART_BEAT_SEQNO;
+      
+      buffer = null;
+      int packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+      buf = new byte[packetSize];
+      
+      checksumStart = dataStart = packetSize;
+      checksumPos = checksumStart;
+      dataPos = dataStart;
+      maxChunks = 0;
+    }
+    
+    // create a new packet
+    Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
+      this.lastPacketInBlock = false;
+      this.numChunks = 0;
+      this.offsetInBlock = offsetInBlock;
+      this.seqno = currentSeqno;
+      currentSeqno++;
+      
+      buffer = null;
+      buf = new byte[pktSize];
+      
+      checksumStart = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+      checksumPos = checksumStart;
+      dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
+      dataPos = dataStart;
+      maxChunks = chunksPerPkt;
+    }
+
+    void writeData(byte[] inarray, int off, int len) {
+      if ( dataPos + len > buf.length) {
+        throw new BufferOverflowException();
+      }
+      System.arraycopy(inarray, off, buf, dataPos, len);
+      dataPos += len;
+    }
+
+    void  writeChecksum(byte[] inarray, int off, int len) {
+      if (checksumPos + len > dataStart) {
+        throw new BufferOverflowException();
+      }
+      System.arraycopy(inarray, off, buf, checksumPos, len);
+      checksumPos += len;
+    }
+    
+    /**
+     * Returns ByteBuffer that contains one full packet, including header.
+     */
+    ByteBuffer getBuffer() {
+      /* Once this is called, no more data can be added to the packet.
+       * setting 'buf' to null ensures that.
+       * This is called only when the packet is ready to be sent.
+       */
+      if (buffer != null) {
+        return buffer;
+      }
+      
+      //prepare the header and close any gap between checksum and data.
+      
+      int dataLen = dataPos - dataStart;
+      int checksumLen = checksumPos - checksumStart;
+      
+      if (checksumPos != dataStart) {
+        /* move the checksum to cover the gap.
+         * This can happen for the last packet.
+         */
+        System.arraycopy(buf, checksumStart, buf, 
+                         dataStart - checksumLen , checksumLen); 
+      }
+      
+      int pktLen = DFSClient.SIZE_OF_INTEGER + dataLen + checksumLen;
+      
+      //normally dataStart == checksumPos, i.e., offset is zero.
+      buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
+                               DataNode.PKT_HEADER_LEN + pktLen);
+      buf = null;
+      buffer.mark();
+      
+      /* write the header and data length.
+       * The format is described in comment before DataNode.BlockSender
+       */
+      buffer.putInt(pktLen);  // pktSize
+      buffer.putLong(offsetInBlock); 
+      buffer.putLong(seqno);
+      buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
+      //end of pkt header
+      buffer.putInt(dataLen); // actual data length, excluding checksum.
+      
+      buffer.reset();
+      return buffer;
+    }
+    
+    // get the packet's last byte's offset in the block
+    long getLastByteOffsetBlock() {
+      return offsetInBlock + dataPos - dataStart;
+    }
+    
+    /**
+     * Check if this packet is a heart beat packet
+     * @return true if the sequence number is HEART_BEAT_SEQNO
+     */
+    private boolean isHeartbeatPacket() {
+      return seqno == HEART_BEAT_SEQNO;
+    }
+    
+    public String toString() {
+      return "packet seqno:" + this.seqno +
+      " offsetInBlock:" + this.offsetInBlock + 
+      " lastPacketInBlock:" + this.lastPacketInBlock +
+      " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+    }
+  }
+
+  //
+  // The DataStreamer class is responsible for sending data packets to the
+  // datanodes in the pipeline. It retrieves a new blockid and block locations
+  // from the namenode, and starts streaming packets to the pipeline of
+  // Datanodes. Every packet has a sequence number associated with
+  // it. When all the packets for a block are sent out and acks for each
+  // if them are received, the DataStreamer closes the current block.
+  //
+  class DataStreamer extends Daemon {
+    private volatile boolean streamerClosed = false;
+    private Block block; // its length is number of bytes acked
+    private BlockAccessToken accessToken;
+    private DataOutputStream blockStream;
+    private DataInputStream blockReplyStream;
+    private ResponseProcessor response = null;
+    private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+    private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
+    volatile boolean hasError = false;
+    volatile int errorIndex = -1;
+    private BlockConstructionStage stage;  // block construction stage
+    private long bytesSent = 0; // number of bytes that've been sent
+
+    /**
+     * Default construction for file create
+     */
+    private DataStreamer() {
+      stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+    }
+    
+    /**
+     * Construct a data streamer for append
+     * @param lastBlock last block of the file to be appended
+     * @param stat status of the file to be appended
+     * @param bytesPerChecksum number of bytes per checksum
+     * @throws IOException if error occurs
+     */
+    private DataStreamer(LocatedBlock lastBlock, FileStatus stat,
+        int bytesPerChecksum) throws IOException {
+      stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
+      block = lastBlock.getBlock();
+      bytesSent = block.getNumBytes();
+      accessToken = lastBlock.getAccessToken();
+      long usedInLastBlock = stat.getLen() % blockSize;
+      int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+      // calculate the amount of free space in the pre-existing 
+      // last crc chunk
+      int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+      int freeInCksum = bytesPerChecksum - usedInCksum;
+
+      // if there is space in the last block, then we have to 
+      // append to that block
+      if (freeInLastBlock == blockSize) {
+        throw new IOException("The last block for file " + 
+            src + " is full.");
+      }
+
+      if (usedInCksum > 0 && freeInCksum > 0) {
+        // if there is space in the last partial chunk, then 
+        // setup in such a way that the next packet will have only 
+        // one chunk that fills up the partial chunk.
+        //
+        computePacketChunkSize(0, freeInCksum);
+        resetChecksumChunk(freeInCksum);
+        appendChunk = true;
+      } else {
+        // if the remaining space in the block is smaller than 
+        // that expected size of of a packet, then create 
+        // smaller size packet.
+        //
+        computePacketChunkSize(Math.min(dfsClient.writePacketSize, freeInLastBlock), 
+            bytesPerChecksum);
+      }
+
+      // setup pipeline to append to the last block XXX retries??
+      nodes = lastBlock.getLocations();
+      errorIndex = -1;   // no errors yet.
+      if (nodes.length < 1) {
+        throw new IOException("Unable to retrieve blocks locations " +
+            " for last block " + block +
+            "of file " + src);
+
+      }
+    }
+    
+    /**
+     * Initialize for data streaming
+     */
+    private void initDataStreaming() {
+      this.setName("DataStreamer for file " + src +
+          " block " + block);
+      response = new ResponseProcessor(nodes);
+      response.start();
+      stage = BlockConstructionStage.DATA_STREAMING;
+    }
+    
+    private void endBlock() {
+      DFSClient.LOG.debug("Closing old block " + block);
+      this.setName("DataStreamer for file " + src);
+      closeResponder();
+      closeStream();
+      nodes = null;
+      stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+    }
+    
+    /*
+     * streamer thread is the only thread that opens streams to datanode, 
+     * and closes them. Any error recovery is also done by this thread.
+     */
+    public void run() {
+      long lastPacket = System.currentTimeMillis();
+      while (!streamerClosed && dfsClient.clientRunning) {
+
+        // if the Responder encountered an error, shutdown Responder
+        if (hasError && response != null) {
+          try {
+            response.close();
+            response.join();
+            response = null;
+          } catch (InterruptedException  e) {
+          }
+        }
+
+        Packet one = null;
+
+        try {
+          // process datanode IO errors if any
+          boolean doSleep = false;
+          if (hasError && errorIndex>=0) {
+            doSleep = processDatanodeError();
+          }
+
+          synchronized (dataQueue) {
+            // wait for a packet to be sent.
+            long now = System.currentTimeMillis();
+            while ((!streamerClosed && !hasError && dfsClient.clientRunning 
+                && dataQueue.size() == 0 && 
+                (stage != BlockConstructionStage.DATA_STREAMING || 
+                 stage == BlockConstructionStage.DATA_STREAMING && 
+                 now - lastPacket < dfsClient.socketTimeout/2)) || doSleep ) {
+              long timeout = dfsClient.socketTimeout/2 - (now-lastPacket);
+              timeout = timeout <= 0 ? 1000 : timeout;
+              timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
+                 timeout : 1000;
+              try {
+                dataQueue.wait(timeout);
+              } catch (InterruptedException  e) {
+              }
+              doSleep = false;
+              now = System.currentTimeMillis();
+            }
+            if (streamerClosed || hasError || !dfsClient.clientRunning) {
+              continue;
+            }
+            // get packet to be sent.
+            if (dataQueue.isEmpty()) {
+              one = new Packet();  // heartbeat packet
+            } else {
+              one = dataQueue.getFirst(); // regular data packet
+            }
+          }
+
+          // get new block from namenode.
+          if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+            DFSClient.LOG.debug("Allocating new block");
+            nodes = nextBlockOutputStream(src);
+            initDataStreaming();
+          } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
+            DFSClient.LOG.debug("Append to block " + block);
+            setupPipelineForAppendOrRecovery();
+            initDataStreaming();
+          }
+
+          long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
+          if (lastByteOffsetInBlock > blockSize) {
+            throw new IOException("BlockSize " + blockSize +
+                " is smaller than data size. " +
+                " Offset of packet in block " + 
+                lastByteOffsetInBlock +
+                " Aborting file " + src);
+          }
+
+          if (one.lastPacketInBlock) {
+            // wait for all data packets have been successfully acked
+            synchronized (dataQueue) {
+              while (!streamerClosed && !hasError && 
+                  ackQueue.size() != 0 && dfsClient.clientRunning) {
+                try {
+                  // wait for acks to arrive from datanodes
+                  dataQueue.wait(1000);
+                } catch (InterruptedException  e) {
+                }
+              }
+            }
+            if (streamerClosed || hasError || !dfsClient.clientRunning) {
+              continue;
+            }
+            stage = BlockConstructionStage.PIPELINE_CLOSE;
+          }
+          
+          // send the packet
+          ByteBuffer buf = one.getBuffer();
+
+          synchronized (dataQueue) {
+            // move packet from dataQueue to ackQueue
+            if (!one.isHeartbeatPacket()) {
+              dataQueue.removeFirst();
+              ackQueue.addLast(one);
+              dataQueue.notifyAll();
+            }
+          }
+
+          if (DFSClient.LOG.isDebugEnabled()) {
+            DFSClient.LOG.debug("DataStreamer block " + block +
+                " sending packet " + one);
+          }
+
+          // write out data to remote datanode
+          blockStream.write(buf.array(), buf.position(), buf.remaining());
+          blockStream.flush();
+          lastPacket = System.currentTimeMillis();
+          
+          if (one.isHeartbeatPacket()) {  //heartbeat packet
+          }
+          
+          // update bytesSent
+          long tmpBytesSent = one.getLastByteOffsetBlock();
+          if (bytesSent < tmpBytesSent) {
+            bytesSent = tmpBytesSent;
+          }
+
+          if (streamerClosed || hasError || !dfsClient.clientRunning) {
+            continue;
+          }
+
+          // Is this block full?
+          if (one.lastPacketInBlock) {
+            // wait for the close packet has been acked
+            synchronized (dataQueue) {
+              while (!streamerClosed && !hasError && 
+                  ackQueue.size() != 0 && dfsClient.clientRunning) {
+                dataQueue.wait(1000);// wait for acks to arrive from datanodes
+              }
+            }
+            if (streamerClosed || hasError || !dfsClient.clientRunning) {
+              continue;
+            }
+
+            endBlock();
+          }
+          if (progress != null) { progress.progress(); }
+
+          // This is used by unit test to trigger race conditions.
+          if (artificialSlowdown != 0 && dfsClient.clientRunning) {
+            Thread.sleep(artificialSlowdown); 
+          }
+        } catch (Throwable e) {
+          DFSClient.LOG.warn("DataStreamer Exception: " + 
+              StringUtils.stringifyException(e));
+          if (e instanceof IOException) {
+            setLastException((IOException)e);
+          }
+          hasError = true;
+          if (errorIndex == -1) { // not a datanode error
+            streamerClosed = true;
+          }
+        }
+      }
+      closeInternal();
+    }
+
+    private void closeInternal() {
+      closeResponder();       // close and join
+      closeStream();
+      streamerClosed = true;
+      closed = true;
+      synchronized (dataQueue) {
+        dataQueue.notifyAll();
+      }
+    }
+
+    /*
+     * close both streamer and DFSOutputStream, should be called only 
+     * by an external thread and only after all data to be sent has 
+     * been flushed to datanode.
+     * 
+     * Interrupt this data streamer if force is true
+     * 
+     * @param force if this data stream is forced to be closed 
+     */
+    void close(boolean force) {
+      streamerClosed = true;
+      synchronized (dataQueue) {
+        dataQueue.notifyAll();
+      }
+      if (force) {
+        this.interrupt();
+      }
+    }
+
+    private void closeResponder() {
+      if (response != null) {
+        try {
+          response.close();
+          response.join();
+        } catch (InterruptedException  e) {
+        } finally {
+          response = null;
+        }
+      }
+    }
+
+    private void closeStream() {
+      if (blockStream != null) {
+        try {
+          blockStream.close();
+        } catch (IOException e) {
+        } finally {
+          blockStream = null;
+        }
+      }
+      if (blockReplyStream != null) {
+        try {
+          blockReplyStream.close();
+        } catch (IOException e) {
+        } finally {
+          blockReplyStream = null;
+        }
+      }
+    }
+
+    //
+    // Processes reponses from the datanodes.  A packet is removed 
+    // from the ackQueue when its response arrives.
+    //
+    private class ResponseProcessor extends Daemon {
+
+      private volatile boolean responderClosed = false;
+      private DatanodeInfo[] targets = null;
+      private boolean isLastPacketInBlock = false;
+
+      ResponseProcessor (DatanodeInfo[] targets) {
+        this.targets = targets;
+      }
+
+      public void run() {
+
+        setName("ResponseProcessor for block " + block);
+        PipelineAck ack = new PipelineAck();
+
+        while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
+          // process responses from datanodes.
+          try {
+            // read an ack from the pipeline
+            ack.readFields(blockReplyStream);
+            if (DFSClient.LOG.isDebugEnabled()) {
+              DFSClient.LOG.debug("DFSClient " + ack);
+            }
+            
+            long seqno = ack.getSeqno();
+            // processes response status from datanodes.
+            for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
+              final DataTransferProtocol.Status reply = ack.getReply(i);
+              if (reply != SUCCESS) {
+                errorIndex = i; // first bad datanode
+                throw new IOException("Bad response " + reply +
+                    " for block " + block +
+                    " from datanode " + 
+                    targets[i].getName());
+              }
+            }
+            
+            assert seqno != PipelineAck.UNKOWN_SEQNO : 
+              "Ack for unkown seqno should be a failed ack: " + ack;
+            if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
+              continue;
+            }
+
+            // a success ack for a data packet
+            Packet one = null;
+            synchronized (dataQueue) {
+              one = ackQueue.getFirst();
+            }
+            if (one.seqno != seqno) {
+              throw new IOException("Responseprocessor: Expecting seqno " +
+                                    " for block " + block +
+                                    one.seqno + " but received " + seqno);
+            }
+            isLastPacketInBlock = one.lastPacketInBlock;
+            // update bytesAcked
+            block.setNumBytes(one.getLastByteOffsetBlock());
+
+            synchronized (dataQueue) {
+              ackQueue.removeFirst();
+              dataQueue.notifyAll();
+            }
+          } catch (Exception e) {
+            if (!responderClosed) {
+              if (e instanceof IOException) {
+                setLastException((IOException)e);
+              }
+              hasError = true;
+              errorIndex = errorIndex==-1 ? 0 : errorIndex;
+              synchronized (dataQueue) {
+                dataQueue.notifyAll();
+              }
+              DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception " + 
+                  " for block " + block +
+                  StringUtils.stringifyException(e));
+              responderClosed = true;
+            }
+          }
+        }
+      }
+
+      void close() {
+        responderClosed = true;
+        this.interrupt();
+      }
+    }
+
+    // If this stream has encountered any errors so far, shutdown 
+    // threads and mark stream as closed. Returns true if we should
+    // sleep for a while after returning from this call.
+    //
+    private boolean processDatanodeError() throws IOException {
+      if (response != null) {
+        DFSClient.LOG.info("Error Recovery for block " + block +
+        " waiting for responder to exit. ");
+        return true;
+      }
+      closeStream();
+
+      // move packets from ack queue to front of the data queue
+      synchronized (dataQueue) {
+        dataQueue.addAll(0, ackQueue);
+        ackQueue.clear();
+      }
+
+      boolean doSleep = setupPipelineForAppendOrRecovery();
+      
+      if (!streamerClosed && dfsClient.clientRunning) {
+        if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+          synchronized (dataQueue) {
+            dataQueue.remove();  // remove the end of block packet
+            dataQueue.notifyAll();
+          }
+          endBlock();
+        } else {
+          initDataStreaming();
+        }
+      }
+      
+      return doSleep;
+    }
+
+
+    /**
+     * Open a DataOutputStream to a DataNode pipeline so that 
+     * it can be written to.
+     * This happens when a file is appended or data streaming fails
+     * It keeps on trying until a pipeline is setup
+     */
+    private boolean setupPipelineForAppendOrRecovery() throws IOException {
+      // check number of datanodes
+      if (nodes == null || nodes.length == 0) {
+        String msg = "Could not get block locations. " + "Source file \""
+            + src + "\" - Aborting...";
+        DFSClient.LOG.warn(msg);
+        setLastException(new IOException(msg));
+        streamerClosed = true;
+        return false;
+      }
+      
+      boolean success = false;
+      long newGS = 0L;
+      while (!success && !streamerClosed && dfsClient.clientRunning) {
+        boolean isRecovery = hasError;
+        // remove bad datanode from list of datanodes.
+        // If errorIndex was not set (i.e. appends), then do not remove 
+        // any datanodes
+        // 
+        if (errorIndex >= 0) {
+          StringBuilder pipelineMsg = new StringBuilder();
+          for (int j = 0; j < nodes.length; j++) {
+            pipelineMsg.append(nodes[j].getName());
+            if (j < nodes.length - 1) {
+              pipelineMsg.append(", ");
+            }
+          }
+          if (nodes.length <= 1) {
+            lastException = new IOException("All datanodes " + pipelineMsg
+                + " are bad. Aborting...");
+            streamerClosed = true;
+            return false;
+          }
+          DFSClient.LOG.warn("Error Recovery for block " + block +
+              " in pipeline " + pipelineMsg + 
+              ": bad datanode " + nodes[errorIndex].getName());
+          DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
+          System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
+          System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
+              newnodes.length-errorIndex);
+          nodes = newnodes;
+          hasError = false;
+          lastException = null;
+          errorIndex = -1;
+        }
+
+        // get a new generation stamp and an access token
+        LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
+        newGS = lb.getBlock().getGenerationStamp();
+        accessToken = lb.getAccessToken();
+        
+        // set up the pipeline again with the remaining nodes
+        success = createBlockOutputStream(nodes, newGS, isRecovery);
+      }
+
+      if (success) {
+        // update pipeline at the namenode
+        Block newBlock = new Block(
+            block.getBlockId(), block.getNumBytes(), newGS);
+        dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, nodes);
+        // update client side generation stamp
+        block = newBlock;
+      }
+      return false; // do not sleep, continue processing
+    }
+
+    /**
+     * Open a DataOutputStream to a DataNode so that it can be written to.
+     * This happens when a file is created and each time a new block is allocated.
+     * Must get block ID and the IDs of the destinations from the namenode.
+     * Returns the list of target datanodes.
+     */
+    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
+      LocatedBlock lb = null;
+      boolean retry = false;
+      DatanodeInfo[] nodes = null;
+      int count = conf.getInt("dfs.client.block.write.retries", 3);
+      boolean success = false;
+      do {
+        hasError = false;
+        lastException = null;
+        errorIndex = -1;
+        retry = false;
+        success = false;
+
+        long startTime = System.currentTimeMillis();
+        DatanodeInfo[] w = excludedNodes.toArray(
+            new DatanodeInfo[excludedNodes.size()]);
+        lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
+        block = lb.getBlock();
+        block.setNumBytes(0);
+        accessToken = lb.getAccessToken();
+        nodes = lb.getLocations();
+
+        //
+        // Connect to first DataNode in the list.
+        //
+        success = createBlockOutputStream(nodes, 0L, false);
+
+        if (!success) {
+          DFSClient.LOG.info("Abandoning block " + block);
+          dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
+          block = null;
+
+          DFSClient.LOG.debug("Excluding datanode " + nodes[errorIndex]);
+          excludedNodes.add(nodes[errorIndex]);
+
+          // Connection failed.  Let's wait a little bit and retry
+          retry = true;
+          try {
+            if (System.currentTimeMillis() - startTime > 5000) {
+              DFSClient.LOG.info("Waiting to find target node: " + nodes[0].getName());
+            }
+            //TODO fix this timout. Extract it o a constant, maybe make it available from conf
+            Thread.sleep(6000);
+          } catch (InterruptedException iex) {
+          }
+        }
+      } while (retry && --count >= 0);
+
+      if (!success) {
+        throw new IOException("Unable to create new block.");
+      }
+      return nodes;
+    }
+
+    // connects to the first datanode in the pipeline
+    // Returns true if success, otherwise return failure.
+    //
+    private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
+        boolean recoveryFlag) {
+      DataTransferProtocol.Status pipelineStatus = SUCCESS;
+      String firstBadLink = "";
+      if (DFSClient.LOG.isDebugEnabled()) {
+        for (int i = 0; i < nodes.length; i++) {
+          DFSClient.LOG.debug("pipeline = " + nodes[i].getName());
+        }
+      }
+
+      // persist blocks on namenode on next flush
+      persistBlocks.set(true);
+
+      try {
+        DFSClient.LOG.debug("Connecting to " + nodes[0].getName());
+        InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
+        s = dfsClient.socketFactory.createSocket();
+        int timeoutValue = dfsClient.getDatanodeReadTimeout(nodes.length);
+        NetUtils.connect(s, target, timeoutValue);
+        s.setSoTimeout(timeoutValue);
+        s.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
+        DFSClient.LOG.debug("Send buf size " + s.getSendBufferSize());
+        long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
+
+        //
+        // Xmit header info to datanode
+        //
+        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+            NetUtils.getOutputStream(s, writeTimeout),
+            DataNode.SMALL_BUFFER_SIZE));
+        blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
+
+        // send the request
+        DataTransferProtocol.Sender.opWriteBlock(out,
+            block.getBlockId(), block.getGenerationStamp(),
+            nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
+            block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes, accessToken);
+        checksum.writeHeader(out);
+        out.flush();
+
+        // receive ack for connect
+        pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
+        firstBadLink = Text.readString(blockReplyStream);
+        if (pipelineStatus != SUCCESS) {
+          if (pipelineStatus == ERROR_ACCESS_TOKEN) {
+            throw new InvalidAccessTokenException(
+                "Got access token error for connect ack with firstBadLink as "
+                    + firstBadLink);
+          } else {
+            throw new IOException("Bad connect ack with firstBadLink as "
+                + firstBadLink);
+          }
+        }
+
+        blockStream = out;
+        return true; // success
+
+      } catch (IOException ie) {
+
+        DFSClient.LOG.info("Exception in createBlockOutputStream " + ie);
+
+        // find the datanode that matches
+        if (firstBadLink.length() != 0) {
+          for (int i = 0; i < nodes.length; i++) {
+            if (nodes[i].getName().equals(firstBadLink)) {
+              errorIndex = i;
+              break;
+            }
+          }
+        } else {
+          errorIndex = 0;
+        }
+        hasError = true;
+        setLastException(ie);
+        blockReplyStream = null;
+        return false;  // error
+      }
+    }
+
+    private LocatedBlock locateFollowingBlock(long start,
+        DatanodeInfo[] excludedNodes) throws IOException {
+      int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
+      long sleeptime = 400;
+      while (true) {
+        long localstart = System.currentTimeMillis();
+        while (true) {
+          try {
+            return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes);
+          } catch (RemoteException e) {
+            IOException ue = 
+              e.unwrapRemoteException(FileNotFoundException.class,
+                  AccessControlException.class,
+                  NSQuotaExceededException.class,
+                  DSQuotaExceededException.class);
+            if (ue != e) { 
+              throw ue; // no need to retry these exceptions
+            }
+            
+            
+            if (NotReplicatedYetException.class.getName().
+                equals(e.getClassName())) {
+              if (retries == 0) { 
+                throw e;
+              } else {
+                --retries;
+                DFSClient.LOG.info(StringUtils.stringifyException(e));
+                if (System.currentTimeMillis() - localstart > 5000) {
+                  DFSClient.LOG.info("Waiting for replication for "
+                      + (System.currentTimeMillis() - localstart) / 1000
+                      + " seconds");
+                }
+                try {
+                  DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
+                      + " retries left " + retries);
+                  Thread.sleep(sleeptime);
+                  sleeptime *= 2;
+                } catch (InterruptedException ie) {
+                }
+              }
+            } else {
+              throw e;
+            }
+
+          }
+        }
+      } 
+    }
+
+    Block getBlock() {
+      return block;
+    }
+
+    DatanodeInfo[] getNodes() {
+      return nodes;
+    }
+
+    BlockAccessToken getAccessToken() {
+      return accessToken;
+    }
+
+    private void setLastException(IOException e) {
+      if (lastException == null) {
+        lastException = e;
+      }
+    }
+  }
+
+  private void isClosed() throws IOException {
+    if (closed) {
+      IOException e = lastException;
+      throw e != null ? e : new IOException("DFSOutputStream is closed");
+    }
+  }
+
+  //
+  // returns the list of targets, if any, that is being currently used.
+  //
+  synchronized DatanodeInfo[] getPipeline() {
+    if (streamer == null) {
+      return null;
+    }
+    DatanodeInfo[] currentNodes = streamer.getNodes();
+    if (currentNodes == null) {
+      return null;
+    }
+    DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
+    for (int i = 0; i < currentNodes.length; i++) {
+      value[i] = currentNodes[i];
+    }
+    return value;
+  }
+
+  private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
+      int bytesPerChecksum) throws IOException {
+    super(new PureJavaCrc32(), bytesPerChecksum, 4);
+    this.dfsClient = dfsClient;
+    this.conf = dfsClient.conf;
+    this.src = src;
+    this.blockSize = blockSize;
+    this.progress = progress;
+    if (progress != null) {
+      DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
+    }
+    
+    if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
+      throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
+                            ") and blockSize(" + blockSize + 
+                            ") do not match. " + "blockSize should be a " +
+                            "multiple of io.bytes.per.checksum");
+                            
+    }
+    checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
+                                            bytesPerChecksum);
+  }
+
+  /**
+   * Create a new output stream to the given DataNode.
+   * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
+   */
+  DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag,
+      boolean createParent, short replication, long blockSize, Progressable progress,
+      int buffersize, int bytesPerChecksum) throws IOException {
+    this(dfsClient, src, blockSize, progress, bytesPerChecksum);
+
+    computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
+
+    try {
+      dfsClient.namenode.create(
+          src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileAlreadyExistsException.class,
+                                     FileNotFoundException.class,
+                                     NSQuotaExceededException.class,
+                                     DSQuotaExceededException.class);
+    }
+    streamer = new DataStreamer();
+    streamer.start();
+  }
+
+  /**
+   * Create a new output stream to the given DataNode.
+   * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+   */
+  DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
+      LocatedBlock lastBlock, FileStatus stat,
+      int bytesPerChecksum) throws IOException {
+    this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum);
+    initialFileSize = stat.getLen(); // length of file when opened
+
+    //
+    // The last partial block of the file has to be filled.
+    //
+    if (lastBlock != null) {
+      // indicate that we are appending to an existing block
+      bytesCurBlock = lastBlock.getBlockSize();
+      streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
+    } else {
+      computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
+      streamer = new DataStreamer();
+    }
+    streamer.start();
+  }
+
+  private void computePacketChunkSize(int psize, int csize) {
+    int chunkSize = csize + checksum.getChecksumSize();
+    int n = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+    chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
+    packetSize = n + chunkSize*chunksPerPacket;
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
+                ", chunkSize=" + chunkSize +
+                ", chunksPerPacket=" + chunksPerPacket +
+                ", packetSize=" + packetSize);
+    }
+  }
+
+  private void queuePacket(Packet packet) {
+    synchronized (dataQueue) {
+      dataQueue.addLast(packet);
+      dataQueue.notifyAll();
+    }
+  }
+
+  private void waitAndQueuePacket(Packet packet) throws IOException {
+    synchronized (dataQueue) {
+      // If queue is full, then wait till we have enough space
+      while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) {
+        try {
+          dataQueue.wait();
+        } catch (InterruptedException  e) {
+        }
+      }
+      isClosed();
+      queuePacket(packet);
+    }
+  }
+
+  // @see FSOutputSummer#writeChunk()
+  @Override
+  protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
+                                                        throws IOException {
+    dfsClient.checkOpen();
+    isClosed();
+
+    int cklen = checksum.length;
+    int bytesPerChecksum = this.checksum.getBytesPerChecksum(); 
+    if (len > bytesPerChecksum) {
+      throw new IOException("writeChunk() buffer size is " + len +
+                            " is larger than supported  bytesPerChecksum " +
+                            bytesPerChecksum);
+    }
+    if (checksum.length != this.checksum.getChecksumSize()) {
+      throw new IOException("writeChunk() checksum size is supposed to be " +
+                            this.checksum.getChecksumSize() + 
+                            " but found to be " + checksum.length);
+    }
+
+    if (currentPacket == null) {
+      currentPacket = new Packet(packetSize, chunksPerPacket, 
+          bytesCurBlock);
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
+            currentPacket.seqno +
+            ", src=" + src +
+            ", packetSize=" + packetSize +
+            ", chunksPerPacket=" + chunksPerPacket +
+            ", bytesCurBlock=" + bytesCurBlock);
+      }
+    }
+
+    currentPacket.writeChecksum(checksum, 0, cklen);
+    currentPacket.writeData(b, offset, len);
+    currentPacket.numChunks++;
+    bytesCurBlock += len;
+
+    // If packet is full, enqueue it for transmission
+    //
+    if (currentPacket.numChunks == currentPacket.maxChunks ||
+        bytesCurBlock == blockSize) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
+            currentPacket.seqno +
+            ", src=" + src +
+            ", bytesCurBlock=" + bytesCurBlock +
+            ", blockSize=" + blockSize +
+            ", appendChunk=" + appendChunk);
+      }
+      waitAndQueuePacket(currentPacket);
+      currentPacket = null;
+
+      // If the reopened file did not end at chunk boundary and the above
+      // write filled up its partial chunk. Tell the summer to generate full 
+      // crc chunks from now on.
+      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
+        appendChunk = false;
+        resetChecksumChunk(bytesPerChecksum);
+      }
+
+      if (!appendChunk) {
+        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.writePacketSize);
+        computePacketChunkSize(psize, bytesPerChecksum);
+      }
+      //
+      // if encountering a block boundary, send an empty packet to 
+      // indicate the end of block and reset bytesCurBlock.
+      //
+      if (bytesCurBlock == blockSize) {
+        currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+            bytesCurBlock);
+        currentPacket.lastPacketInBlock = true;
+        waitAndQueuePacket(currentPacket);
+        currentPacket = null;
+        bytesCurBlock = 0;
+        lastFlushOffset = -1;
+      }
+    }
+  }
+
+  @Override
+  @Deprecated
+  public synchronized void sync() throws IOException {
+    hflush();
+  }
+  
+  /**
+   * flushes out to all replicas of the block. 
+   * The data is in the buffers of the DNs 
+   * but not neccessary on the DN's OS buffers. 
+   *
+   * It is a synchronous operation. When it returns,
+   * it gurantees that flushed data become visible to new readers. 
+   * It is not guaranteed that data has been flushed to 
+   * persistent store on the datanode. 
+   * Block allocations are persisted on namenode.
+   */
+  @Override
+  public synchronized void hflush() throws IOException {
+    dfsClient.checkOpen();
+    isClosed();
+    try {
+      /* Record current blockOffset. This might be changed inside
+       * flushBuffer() where a partial checksum chunk might be flushed.
+       * After the flush, reset the bytesCurBlock back to its previous value,
+       * any partial checksum chunk will be sent now and in next packet.
+       */
+      long saveOffset = bytesCurBlock;
+
+      // flush checksum buffer, but keep checksum buffer intact
+      flushBuffer(true);
+
+      DFSClient.LOG.debug("DFSClient flush() : saveOffset " + saveOffset +  
+                " bytesCurBlock " + bytesCurBlock +
+                " lastFlushOffset " + lastFlushOffset);
+      
+      // Flush only if we haven't already flushed till this offset.
+      if (lastFlushOffset != bytesCurBlock) {
+
+        // record the valid offset of this flush
+        lastFlushOffset = bytesCurBlock;
+
+        // wait for all packets to be sent and acknowledged
+        flushInternal();
+      } else {
+        // just discard the current packet since it is already been sent.
+        currentPacket = null;
+      }
+      
+      // Restore state of stream. Record the last flush offset 
+      // of the last full chunk that was flushed.
+      //
+      bytesCurBlock = saveOffset;
+
+      // If any new blocks were allocated since the last flush, 
+      // then persist block locations on namenode. 
+      //
+      if (persistBlocks.getAndSet(false)) {
+        dfsClient.namenode.fsync(src, dfsClient.clientName);
+      }
+    } catch (IOException e) {
+        lastException = new IOException("IOException flush:" + e);
+        closeThreads(true);
+        throw e;
+    }
+  }
+
+  /**
+   * The expected semantics is all data have flushed out to all replicas 
+   * and all replicas have done posix fsync equivalent - ie the OS has 
+   * flushed it to the disk device (but the disk may have it in its cache).
+   * 
+   * Right now by default it is implemented as hflush
+   */
+  @Override
+  public synchronized void hsync() throws IOException {
+    hflush();
+  }
+  
+  /**
+   * Waits till all existing data is flushed and confirmations 
+   * received from datanodes. 
+   */
+  private synchronized void flushInternal() throws IOException {
+    dfsClient.checkOpen();
+    isClosed();
+    //
+    // If there is data in the current buffer, send it across
+    //
+    if (currentPacket != null) {
+      queuePacket(currentPacket);
+      currentPacket = null;
+    }
+
+    synchronized (dataQueue) {
+      while (!closed && dataQueue.size() + ackQueue.size() > 0) {
+        try {
+          dataQueue.wait();
+        } catch (InterruptedException  e) {
+        }
+      }
+      isClosed();
+    }
+  }
+
+  /**
+   * Aborts this output stream and releases any system 
+   * resources associated with this stream.
+   */
+  synchronized void abort() throws IOException {
+    if (closed) {
+      return;
+    }
+    streamer.setLastException(new IOException("Lease timeout of " +
+                             (dfsClient.hdfsTimeout/1000) + " seconds expired."));
+    closeThreads(true);
+  }
+
+  // shutdown datastreamer and responseprocessor threads.
+  // interrupt datastreamer if force is true
+  private void closeThreads(boolean force) throws IOException {
+    try {
+      streamer.close(force);
+      streamer.join();
+      if (s != null) {
+        s.close();
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Failed to shutdown streamer");
+    } finally {
+      streamer = null;
+      s = null;
+      closed = true;
+    }
+  }
+  
+  /**
+   * Closes this output stream and releases any system 
+   * resources associated with this stream.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      IOException e = lastException;
+      if (e == null)
+        return;
+      else
+        throw e;
+    }
+
+    try {
+      flushBuffer();       // flush from all upper layers
+
+      if (currentPacket != null) { 
+        waitAndQueuePacket(currentPacket);
+      }
+
+      if (bytesCurBlock != 0) {
+        // send an empty packet to mark the end of the block
+        currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+            bytesCurBlock);
+        currentPacket.lastPacketInBlock = true;
+      }
+
+      flushInternal();             // flush all data to Datanodes
+      // get last block before destroying the streamer
+      Block lastBlock = streamer.getBlock();
+      closeThreads(false);
+      completeFile(lastBlock);
+      dfsClient.leasechecker.remove(src);
+    } finally {
+      closed = true;
+    }
+  }
+
+  // should be called holding (this) lock since setTestFilename() may 
+  // be called during unit tests
+  private void completeFile(Block last) throws IOException {
+    long localstart = System.currentTimeMillis();
+    boolean fileComplete = false;
+    while (!fileComplete) {
+      fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last);
+      if (!fileComplete) {
+        if (!dfsClient.clientRunning ||
+              (dfsClient.hdfsTimeout > 0 &&
+               localstart + dfsClient.hdfsTimeout < System.currentTimeMillis())) {
+            String msg = "Unable to close file because dfsclient " +
+                          " was unable to contact the HDFS servers." +
+                          " clientRunning " + dfsClient.clientRunning +
+                          " hdfsTimeout " + dfsClient.hdfsTimeout;
+            DFSClient.LOG.info(msg);
+            throw new IOException(msg);
+        }
+        try {
+          Thread.sleep(400);
+          if (System.currentTimeMillis() - localstart > 5000) {
+            DFSClient.LOG.info("Could not complete file " + src + " retrying...");
+          }
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
+  }
+
+  void setArtificialSlowdown(long period) {
+    artificialSlowdown = period;
+  }
+
+  synchronized void setChunksPerPacket(int value) {
+    chunksPerPacket = Math.min(chunksPerPacket, value);
+    packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER +
+                 (checksum.getBytesPerChecksum() + 
+                  checksum.getChecksumSize()) * chunksPerPacket;
+  }
+
+  synchronized void setTestFilename(String newname) {
+    src = newname;
+  }
+
+  /**
+   * Returns the size of a file as it was when this stream was opened
+   */
+  long getInitialLen() {
+    return initialFileSize;
+  }
+
+  /**
+   * Returns the access token currently used by streamer, for testing only
+   */
+  BlockAccessToken getAccessToken() {
+    return streamer.getAccessToken();
+  }
+
+}
\ No newline at end of file

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=906690&r1=906689&r2=906690&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu Feb  4 22:20:02 2010
@@ -35,7 +35,6 @@
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
-import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.AccessControlException;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=906690&r1=906689&r2=906690&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu Feb  4 22:20:02 2010
@@ -35,7 +35,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -108,8 +108,8 @@
       long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock);     
       
       // Use the block name for file name. 
-      DFSClient.BlockReader blockReader = 
-        DFSClient.BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
+      BlockReader blockReader = 
+        BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
                                              blockId, accessToken, genStamp ,offsetIntoBlock, 
                                              amtToRead, 
                                              conf.getInt("io.file.buffer.size",

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=906690&r1=906689&r2=906690&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Feb  4 22:20:02 2010
@@ -33,6 +33,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -399,7 +400,7 @@
     InetSocketAddress targetAddr = null;
     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
     Socket s = null;
-    DFSClient.BlockReader blockReader = null; 
+    BlockReader blockReader = null; 
     Block block = lblock.getBlock(); 
 
     while (s == null) {
@@ -427,7 +428,7 @@
         s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
         
         blockReader = 
-          DFSClient.BlockReader.newBlockReader(s, targetAddr.toString() + ":" + 
+          BlockReader.newBlockReader(s, targetAddr.toString() + ":" + 
                                                block.getBlockId(), 
                                                block.getBlockId(), 
                                                lblock.getAccessToken(),



Mime
View raw message