hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r529410 [4/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/c...
Date Mon, 16 Apr 2007 21:44:46 GMT
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Apr 16 14:44:35 2007
@@ -42,1400 +42,1400 @@
  * @author Mike Cafarella, Tessa MacDuff
  ********************************************************/
 class DFSClient implements FSConstants {
-    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.DFSClient");
-    static int MAX_BLOCK_ACQUIRE_FAILURES = 3;
-    private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
-    private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
-    ClientProtocol namenode;
-    boolean running = true;
-    Random r = new Random();
-    String clientName;
-    Daemon leaseChecker;
-    private Configuration conf;
-    private long defaultBlockSize;
-    private short defaultReplication;
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.DFSClient");
+  static int MAX_BLOCK_ACQUIRE_FAILURES = 3;
+  private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
+  private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
+  ClientProtocol namenode;
+  boolean running = true;
+  Random r = new Random();
+  String clientName;
+  Daemon leaseChecker;
+  private Configuration conf;
+  private long defaultBlockSize;
+  private short defaultReplication;
     
-    /**
-     * A map from name -> DFSOutputStream of files that are currently being
-     * written by this client.
-     */
-    private TreeMap pendingCreates = new TreeMap();
+  /**
+   * A map from name -> DFSOutputStream of files that are currently being
+   * written by this client.
+   */
+  private TreeMap pendingCreates = new TreeMap();
     
-    /**
-     * A class to track the list of DFS clients, so that they can be closed
-     * on exit.
-     * @author Owen O'Malley
-     */
-    private static class ClientFinalizer extends Thread {
-      private List clients = new ArrayList();
-
-      public synchronized void addClient(DFSClient client) {
-        clients.add(client);
-      }
-
-      public synchronized void run() {
-        Iterator itr = clients.iterator();
-        while (itr.hasNext()) {
-          DFSClient client = (DFSClient) itr.next();
-          if (client.running) {
-            try {
-              client.close();
-            } catch (IOException ie) {
-              System.err.println("Error closing client");
-              ie.printStackTrace();
-            }
+  /**
+   * A class to track the list of DFS clients, so that they can be closed
+   * on exit.
+   * @author Owen O'Malley
+   */
+  private static class ClientFinalizer extends Thread {
+    private List clients = new ArrayList();
+
+    public synchronized void addClient(DFSClient client) {
+      clients.add(client);
+    }
+
+    public synchronized void run() {
+      Iterator itr = clients.iterator();
+      while (itr.hasNext()) {
+        DFSClient client = (DFSClient) itr.next();
+        if (client.running) {
+          try {
+            client.close();
+          } catch (IOException ie) {
+            System.err.println("Error closing client");
+            ie.printStackTrace();
           }
         }
       }
     }
+  }
 
-    // add a cleanup thread
-    private static ClientFinalizer clientFinalizer = new ClientFinalizer();
-    static {
-      Runtime.getRuntime().addShutdownHook(clientFinalizer);
-    }
+  // add a cleanup thread
+  private static ClientFinalizer clientFinalizer = new ClientFinalizer();
+  static {
+    Runtime.getRuntime().addShutdownHook(clientFinalizer);
+  }
 
         
-    /** 
-     * Create a new DFSClient connected to the given namenode server.
-     */
-    public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
+  /** 
+   * Create a new DFSClient connected to the given namenode server.
+   */
+  public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
     throws IOException {
-        this.conf = conf;
-        this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
-            ClientProtocol.versionID, nameNodeAddr, conf);
-        String taskId = conf.get("mapred.task.id");
-        if (taskId != null) {
-            this.clientName = "DFSClient_" + taskId; 
-        } else {
-            this.clientName = "DFSClient_" + r.nextInt();
-        }
-        defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-        defaultReplication = (short) conf.getInt("dfs.replication", 3);
-        this.leaseChecker = new Daemon(new LeaseChecker());
-        this.leaseChecker.start();
-    }
-
-    private void checkOpen() throws IOException {
-      if (!running) {
-        IOException result = new IOException("Filesystem closed");
-        throw result;
-      }
+    this.conf = conf;
+    this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+                                                  ClientProtocol.versionID, nameNodeAddr, conf);
+    String taskId = conf.get("mapred.task.id");
+    if (taskId != null) {
+      this.clientName = "DFSClient_" + taskId; 
+    } else {
+      this.clientName = "DFSClient_" + r.nextInt();
+    }
+    defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    defaultReplication = (short) conf.getInt("dfs.replication", 3);
+    this.leaseChecker = new Daemon(new LeaseChecker());
+    this.leaseChecker.start();
+  }
+
+  private void checkOpen() throws IOException {
+    if (!running) {
+      IOException result = new IOException("Filesystem closed");
+      throw result;
     }
+  }
     
-    /**
-     * Close the file system, abadoning all of the leases and files being
-     * created.
-     */
-    public void close() throws IOException {
-      // synchronize in here so that we don't need to change the API
-      synchronized (this) {
-        checkOpen();
-        synchronized (pendingCreates) {
-          Iterator file_itr = pendingCreates.keySet().iterator();
-          while (file_itr.hasNext()) {
-            String name = (String) file_itr.next();
-            try {
-              namenode.abandonFileInProgress(name, clientName);
-            } catch (IOException ie) {
-              System.err.println("Exception abandoning create lock on " + name);
-              ie.printStackTrace();
-            }
+  /**
+   * Close the file system, abadoning all of the leases and files being
+   * created.
+   */
+  public void close() throws IOException {
+    // synchronize in here so that we don't need to change the API
+    synchronized (this) {
+      checkOpen();
+      synchronized (pendingCreates) {
+        Iterator file_itr = pendingCreates.keySet().iterator();
+        while (file_itr.hasNext()) {
+          String name = (String) file_itr.next();
+          try {
+            namenode.abandonFileInProgress(name, clientName);
+          } catch (IOException ie) {
+            System.err.println("Exception abandoning create lock on " + name);
+            ie.printStackTrace();
           }
-          pendingCreates.clear();
-        }
-        this.running = false;
-        try {
-            leaseChecker.join();
-        } catch (InterruptedException ie) {
         }
+        pendingCreates.clear();
+      }
+      this.running = false;
+      try {
+        leaseChecker.join();
+      } catch (InterruptedException ie) {
       }
     }
+  }
 
-    /**
-     * Get the default block size for this cluster
-     * @return the default block size in bytes
-     */
-    public long getDefaultBlockSize() {
-      return defaultBlockSize;
-    }
+  /**
+   * Get the default block size for this cluster
+   * @return the default block size in bytes
+   */
+  public long getDefaultBlockSize() {
+    return defaultBlockSize;
+  }
     
-    public long getBlockSize(UTF8 f) throws IOException {
-      int retries = 4;
-      while (true) {
-        try {
-          return namenode.getBlockSize(f.toString());
-        } catch (IOException ie) {
-          if (--retries == 0) {
-            LOG.warn("Problem getting block size: " + 
-                      StringUtils.stringifyException(ie));
-            throw ie;
-          }
-          LOG.debug("Problem getting block size: " + 
-                    StringUtils.stringifyException(ie));
+  public long getBlockSize(UTF8 f) throws IOException {
+    int retries = 4;
+    while (true) {
+      try {
+        return namenode.getBlockSize(f.toString());
+      } catch (IOException ie) {
+        if (--retries == 0) {
+          LOG.warn("Problem getting block size: " + 
+                   StringUtils.stringifyException(ie));
+          throw ie;
         }
+        LOG.debug("Problem getting block size: " + 
+                  StringUtils.stringifyException(ie));
       }
     }
+  }
 
-    /**
-     * Report corrupt blocks that were discovered by the client.
-     */
-    public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
-      namenode.reportBadBlocks(blocks);
-    }
+  /**
+   * Report corrupt blocks that were discovered by the client.
+   */
+  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+    namenode.reportBadBlocks(blocks);
+  }
   
-    public short getDefaultReplication() {
-      return defaultReplication;
-    }
+  public short getDefaultReplication() {
+    return defaultReplication;
+  }
     
-    /**
-     * Get hints about the location of the indicated block(s).  The
-     * array returned is as long as there are blocks in the indicated
-     * range.  Each block may have one or more locations.
-     */
-    public String[][] getHints(UTF8 src, long start, long len) throws IOException {
-        return namenode.getHints(src.toString(), start, len);
-    }
-
-    /**
-     * Create an input stream that obtains a nodelist from the
-     * namenode, and then reads from all the right places.  Creates
-     * inner subclass of InputStream that does the right out-of-band
-     * work.
-     */
-    public DFSInputStream open(UTF8 src) throws IOException {
-        checkOpen();
-        //    Get block info from namenode
-        return new DFSInputStream(src.toString());
-    }
-
-    /**
-     * Create a new dfs file and return an output stream for writing into it. 
-     * 
-     * @param src stream name
-     * @param overwrite do not check for file existence if true
-     * @return output stream
-     * @throws IOException
-     */
-    public OutputStream create( UTF8 src, 
-                                  boolean overwrite
-                                ) throws IOException {
-      return create( src, overwrite, defaultReplication, defaultBlockSize, null);
-    }
+  /**
+   * Get hints about the location of the indicated block(s).  The
+   * array returned is as long as there are blocks in the indicated
+   * range.  Each block may have one or more locations.
+   */
+  public String[][] getHints(UTF8 src, long start, long len) throws IOException {
+    return namenode.getHints(src.toString(), start, len);
+  }
+
+  /**
+   * Create an input stream that obtains a nodelist from the
+   * namenode, and then reads from all the right places.  Creates
+   * inner subclass of InputStream that does the right out-of-band
+   * work.
+   */
+  public DFSInputStream open(UTF8 src) throws IOException {
+    checkOpen();
+    //    Get block info from namenode
+    return new DFSInputStream(src.toString());
+  }
+
+  /**
+   * Create a new dfs file and return an output stream for writing into it. 
+   * 
+   * @param src stream name
+   * @param overwrite do not check for file existence if true
+   * @return output stream
+   * @throws IOException
+   */
+  public OutputStream create( UTF8 src, 
+                              boolean overwrite
+                              ) throws IOException {
+    return create( src, overwrite, defaultReplication, defaultBlockSize, null);
+  }
     
-    /**
-     * Create a new dfs file and return an output stream for writing into it
-     * with write-progress reporting. 
-     * 
-     * @param src stream name
-     * @param overwrite do not check for file existence if true
-     * @return output stream
-     * @throws IOException
-     */
-    public OutputStream create( UTF8 src, 
-                                  boolean overwrite,
-                                  Progressable progress
-                                ) throws IOException {
-      return create( src, overwrite, defaultReplication, defaultBlockSize, null);
-    }
+  /**
+   * Create a new dfs file and return an output stream for writing into it
+   * with write-progress reporting. 
+   * 
+   * @param src stream name
+   * @param overwrite do not check for file existence if true
+   * @return output stream
+   * @throws IOException
+   */
+  public OutputStream create( UTF8 src, 
+                              boolean overwrite,
+                              Progressable progress
+                              ) throws IOException {
+    return create( src, overwrite, defaultReplication, defaultBlockSize, null);
+  }
     
-    /**
-     * Create a new dfs file with the specified block replication 
-     * and return an output stream for writing into the file.  
-     * 
-     * @param src stream name
-     * @param overwrite do not check for file existence if true
-     * @param replication block replication
-     * @return output stream
-     * @throws IOException
-     */
-    public OutputStream create( UTF8 src, 
-                                  boolean overwrite, 
-                                  short replication,
-                                  long blockSize
+  /**
+   * Create a new dfs file with the specified block replication 
+   * and return an output stream for writing into the file.  
+   * 
+   * @param src stream name
+   * @param overwrite do not check for file existence if true
+   * @param replication block replication
+   * @return output stream
+   * @throws IOException
+   */
+  public OutputStream create( UTF8 src, 
+                              boolean overwrite, 
+                              short replication,
+                              long blockSize
+                              ) throws IOException {
+    return create(src, overwrite, replication, blockSize, null);
+  }
+
+  /**
+   * Create a new dfs file with the specified block replication 
+   * with write-progress reporting and return an output stream for writing
+   * into the file.  
+   * 
+   * @param src stream name
+   * @param overwrite do not check for file existence if true
+   * @param replication block replication
+   * @return output stream
+   * @throws IOException
+   */
+  public OutputStream create( UTF8 src, 
+                              boolean overwrite, 
+                              short replication,
+                              long blockSize,
+                              Progressable progress
+                              ) throws IOException {
+    checkOpen();
+    OutputStream result = new DFSOutputStream(src, overwrite, 
+                                              replication, blockSize, progress);
+    synchronized (pendingCreates) {
+      pendingCreates.put(src.toString(), result);
+    }
+    return result;
+  }
+  /**
+   * Set replication for an existing file.
+   * 
+   * @see ClientProtocol#setReplication(String, short)
+   * @param replication
+   * @throws IOException
+   * @return true is successful or false if file does not exist 
+   * @author shv
+   */
+  public boolean setReplication(UTF8 src, 
+                                short replication
                                 ) throws IOException {
-      return create(src, overwrite, replication, blockSize, null);
-    }
+    return namenode.setReplication(src.toString(), replication);
+  }
 
-    /**
-     * Create a new dfs file with the specified block replication 
-     * with write-progress reporting and return an output stream for writing
-     * into the file.  
-     * 
-     * @param src stream name
-     * @param overwrite do not check for file existence if true
-     * @param replication block replication
-     * @return output stream
-     * @throws IOException
-     */
-    public OutputStream create( UTF8 src, 
-                                  boolean overwrite, 
-                                  short replication,
-                                  long blockSize,
-                                  Progressable progress
-                                ) throws IOException {
-      checkOpen();
-      OutputStream result = new DFSOutputStream(src, overwrite, 
-                                                  replication, blockSize, progress);
-      synchronized (pendingCreates) {
-        pendingCreates.put(src.toString(), result);
+  /**
+   * Make a direct connection to namenode and manipulate structures
+   * there.
+   */
+  public boolean rename(UTF8 src, UTF8 dst) throws IOException {
+    checkOpen();
+    return namenode.rename(src.toString(), dst.toString());
+  }
+
+  /**
+   * Make a direct connection to namenode and manipulate structures
+   * there.
+   */
+  public boolean delete(UTF8 src) throws IOException {
+    checkOpen();
+    return namenode.delete(src.toString());
+  }
+
+  /**
+   */
+  public boolean exists(UTF8 src) throws IOException {
+    checkOpen();
+    return namenode.exists(src.toString());
+  }
+
+  /**
+   */
+  public boolean isDirectory(UTF8 src) throws IOException {
+    checkOpen();
+    return namenode.isDir(src.toString());
+  }
+
+  /**
+   */
+  public DFSFileInfo[] listPaths(UTF8 src) throws IOException {
+    checkOpen();
+    return namenode.getListing(src.toString());
+  }
+
+  /**
+   */
+  public long totalRawCapacity() throws IOException {
+    long rawNums[] = namenode.getStats();
+    return rawNums[0];
+  }
+
+  /**
+   */
+  public long totalRawUsed() throws IOException {
+    long rawNums[] = namenode.getStats();
+    return rawNums[1];
+  }
+
+  public DatanodeInfo[] datanodeReport() throws IOException {
+    return namenode.getDatanodeReport();
+  }
+    
+  /**
+   * Enter, leave or get safe mode.
+   * See {@link ClientProtocol#setSafeMode(FSConstants.SafeModeAction)} 
+   * for more details.
+   * 
+   * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
+   */
+  public boolean setSafeMode( SafeModeAction action ) throws IOException {
+    return namenode.setSafeMode( action );
+  }
+
+  /**
+   * Refresh the hosts and exclude files.  (Rereads them.)
+   * See {@link ClientProtocol#refreshNodes()} 
+   * for more details.
+   * 
+   * @see ClientProtocol#refreshNodes()
+   */
+  public void refreshNodes() throws IOException {
+    namenode.refreshNodes();
+  }
+
+  /**
+   * Dumps DFS data structures into specified file.
+   * See {@link ClientProtocol#metaSave()} 
+   * for more details.
+   * 
+   * @see ClientProtocol#metaSave()
+   */
+  public void metaSave(String pathname) throws IOException {
+    namenode.metaSave(pathname);
+  }
+    
+  /**
+   * @see ClientProtocol#finalizeUpgrade()
+   */
+  public void finalizeUpgrade() throws IOException {
+    namenode.finalizeUpgrade();
+  }
+
+  /**
+   */
+  public boolean mkdirs(UTF8 src) throws IOException {
+    checkOpen();
+    return namenode.mkdirs(src.toString());
+  }
+
+  /**
+   */
+  public void lock(UTF8 src, boolean exclusive) throws IOException {
+    long start = System.currentTimeMillis();
+    boolean hasLock = false;
+    while (! hasLock) {
+      hasLock = namenode.obtainLock(src.toString(), clientName, exclusive);
+      if (! hasLock) {
+        try {
+          Thread.sleep(400);
+          if (System.currentTimeMillis() - start > 5000) {
+            LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms.");
+            Thread.sleep(2000);
+          }
+        } catch (InterruptedException ie) {
+        }
       }
-      return result;
-    }
-    /**
-     * Set replication for an existing file.
-     * 
-     * @see ClientProtocol#setReplication(String, short)
-     * @param replication
-     * @throws IOException
-     * @return true is successful or false if file does not exist 
-     * @author shv
-     */
-    public boolean setReplication(UTF8 src, 
-                                  short replication
-                                ) throws IOException {
-      return namenode.setReplication(src.toString(), replication);
     }
+  }
 
-    /**
-     * Make a direct connection to namenode and manipulate structures
-     * there.
-     */
-    public boolean rename(UTF8 src, UTF8 dst) throws IOException {
-        checkOpen();
-        return namenode.rename(src.toString(), dst.toString());
+  /**
+   *
+   */
+  public void release(UTF8 src) throws IOException {
+    boolean hasReleased = false;
+    while (! hasReleased) {
+      hasReleased = namenode.releaseLock(src.toString(), clientName);
+      if (! hasReleased) {
+        LOG.info("Could not release.  Retrying...");
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException ie) {
+        }
+      }
     }
+  }
 
-    /**
-     * Make a direct connection to namenode and manipulate structures
-     * there.
-     */
-    public boolean delete(UTF8 src) throws IOException {
-        checkOpen();
-        return namenode.delete(src.toString());
+  /**
+   * Pick the best node from which to stream the data.
+   * Entries in <i>nodes</i> are already in the priority order
+   */
+  private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
+    if (nodes != null) { 
+      for (int i = 0; i < nodes.length; i++) {
+        if (!deadNodes.contains(nodes[i])) {
+          return nodes[i];
+        }
+      }
     }
+    throw new IOException("No live nodes contain current block");
+  }
 
+  /***************************************************************
+   * Periodically check in with the namenode and renew all the leases
+   * when the lease period is half over.
+   ***************************************************************/
+  class LeaseChecker implements Runnable {
     /**
      */
-    public boolean exists(UTF8 src) throws IOException {
-        checkOpen();
-        return namenode.exists(src.toString());
+    public void run() {
+      long lastRenewed = 0;
+      while (running) {
+        if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
+          try {
+            if( pendingCreates.size() > 0 )
+              namenode.renewLease(clientName);
+            lastRenewed = System.currentTimeMillis();
+          } catch (IOException ie) {
+            String err = StringUtils.stringifyException(ie);
+            LOG.warn("Problem renewing lease for " + clientName +
+                     ": " + err);
+          }
+        }
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+        }
+      }
     }
+  }
 
-    /**
-     */
-    public boolean isDirectory(UTF8 src) throws IOException {
-        checkOpen();
-        return namenode.isDir(src.toString());
+  /** Utility class to encapsulate data node info and its ip address. */
+  private static class DNAddrPair {
+    DatanodeInfo info;
+    InetSocketAddress addr;
+    DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
+      this.info = info;
+      this.addr = addr;
     }
-
+  }
+        
+  /****************************************************************
+   * DFSInputStream provides bytes from a named file.  It handles 
+   * negotiation of the namenode and various datanodes as necessary.
+   ****************************************************************/
+  class DFSInputStream extends FSInputStream {
+    private Socket s = null;
+    boolean closed = false;
+
+    private String src;
+    private DataInputStream blockStream;
+    private Block blocks[] = null;
+    private DatanodeInfo nodes[][] = null;
+    private DatanodeInfo currentNode = null;
+    private Block currentBlock = null;
+    private long pos = 0;
+    private long filelen = 0;
+    private long blockEnd = -1;
+    private TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
+        
     /**
      */
-    public DFSFileInfo[] listPaths(UTF8 src) throws IOException {
-        checkOpen();
-        return namenode.getListing(src.toString());
+    public DFSInputStream(String src) throws IOException {
+      this.src = src;
+      openInfo();
+      this.blockStream = null;
+      for (int i = 0; i < blocks.length; i++) {
+        this.filelen += blocks[i].getNumBytes();
+      }
     }
 
     /**
+     * Grab the open-file info from namenode
      */
-    public long totalRawCapacity() throws IOException {
-        long rawNums[] = namenode.getStats();
-        return rawNums[0];
-    }
+    synchronized void openInfo() throws IOException {
+      Block oldBlocks[] = this.blocks;
 
-    /**
-     */
-    public long totalRawUsed() throws IOException {
-        long rawNums[] = namenode.getStats();
-        return rawNums[1];
-    }
+      LocatedBlock results[] = namenode.open(src);            
+      Vector blockV = new Vector();
+      Vector nodeV = new Vector();
+      for (int i = 0; i < results.length; i++) {
+        blockV.add(results[i].getBlock());
+        nodeV.add(results[i].getLocations());
+      }
+      Block newBlocks[] = (Block[]) blockV.toArray(new Block[blockV.size()]);
 
-    public DatanodeInfo[] datanodeReport() throws IOException {
-        return namenode.getDatanodeReport();
-    }
-    
-    /**
-     * Enter, leave or get safe mode.
-     * See {@link ClientProtocol#setSafeMode(FSConstants.SafeModeAction)} 
-     * for more details.
-     * 
-     * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
-     */
-    public boolean setSafeMode( SafeModeAction action ) throws IOException {
-      return namenode.setSafeMode( action );
+      if (oldBlocks != null) {
+        for (int i = 0; i < oldBlocks.length; i++) {
+          if (! oldBlocks[i].equals(newBlocks[i])) {
+            throw new IOException("Blocklist for " + src + " has changed!");
+          }
+        }
+        if (oldBlocks.length != newBlocks.length) {
+          throw new IOException("Blocklist for " + src + " now has different length");
+        }
+      }
+      this.blocks = newBlocks;
+      this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);
+      this.currentNode = null;
     }
 
     /**
-     * Refresh the hosts and exclude files.  (Rereads them.)
-     * See {@link ClientProtocol#refreshNodes()} 
-     * for more details.
-     * 
-     * @see ClientProtocol#refreshNodes()
+     * Returns the datanode from which the stream is currently reading.
      */
-    public void refreshNodes() throws IOException {
-      namenode.refreshNodes();
+    public DatanodeInfo getCurrentDatanode() {
+      return currentNode;
     }
 
     /**
-     * Dumps DFS data structures into specified file.
-     * See {@link ClientProtocol#metaSave()} 
-     * for more details.
-     * 
-     * @see ClientProtocol#metaSave()
-     */
-    public void metaSave(String pathname) throws IOException {
-      namenode.metaSave(pathname);
-    }
-    
-    /**
-     * @see ClientProtocol#finalizeUpgrade()
+     * Returns the block containing the target position. 
      */
-    public void finalizeUpgrade() throws IOException {
-      namenode.finalizeUpgrade();
+    public Block getCurrentBlock() {
+      return currentBlock;
     }
 
-    /**
-     */
-    public boolean mkdirs(UTF8 src) throws IOException {
-        checkOpen();
-        return namenode.mkdirs(src.toString());
-    }
 
     /**
+     * Used by the automatic tests to detemine blocks locations of a
+     * file
      */
-    public void lock(UTF8 src, boolean exclusive) throws IOException {
-        long start = System.currentTimeMillis();
-        boolean hasLock = false;
-        while (! hasLock) {
-            hasLock = namenode.obtainLock(src.toString(), clientName, exclusive);
-            if (! hasLock) {
-                try {
-                    Thread.sleep(400);
-                    if (System.currentTimeMillis() - start > 5000) {
-                        LOG.info("Waiting to retry lock for " + (System.currentTimeMillis() - start) + " ms.");
-                        Thread.sleep(2000);
-                    }
-                } catch (InterruptedException ie) {
-                }
-            }
-        }
+    synchronized DatanodeInfo[][] getDataNodes() {
+      return nodes;
     }
 
     /**
-     *
+     * Open a DataInputStream to a DataNode so that it can be read from.
+     * We get block ID and the IDs of the destinations at startup, from the namenode.
      */
-    public void release(UTF8 src) throws IOException {
-        boolean hasReleased = false;
-        while (! hasReleased) {
-            hasReleased = namenode.releaseLock(src.toString(), clientName);
-            if (! hasReleased) {
-                LOG.info("Could not release.  Retrying...");
-                try {
-                    Thread.sleep(2000);
-                } catch (InterruptedException ie) {
-                }
-            }
-        }
-    }
+    private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
+      if (target >= filelen) {
+        throw new IOException("Attempted to read past end of file");
+      }
 
-    /**
-     * Pick the best node from which to stream the data.
-     * Entries in <i>nodes</i> are already in the priority order
-     */
-    private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
-      if (nodes != null) { 
-        for (int i = 0; i < nodes.length; i++) {
-          if (!deadNodes.contains(nodes[i])) {
-            return nodes[i];
-          }
-        }
+      if (s != null) {
+        s.close();
+        s = null;
       }
-        throw new IOException("No live nodes contain current block");
-    }
 
-    /***************************************************************
-     * Periodically check in with the namenode and renew all the leases
-     * when the lease period is half over.
-     ***************************************************************/
-    class LeaseChecker implements Runnable {
-        /**
-         */
-        public void run() {
-            long lastRenewed = 0;
-            while (running) {
-                if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
-                    try {
-                      if( pendingCreates.size() > 0 )
-                        namenode.renewLease(clientName);
-                      lastRenewed = System.currentTimeMillis();
-                    } catch (IOException ie) {
-                      String err = StringUtils.stringifyException(ie);
-                      LOG.warn("Problem renewing lease for " + clientName +
-                                  ": " + err);
-                    }
-                }
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ie) {
-                }
-            }
+      //
+      // Compute desired block
+      //
+      int targetBlock = -1;
+      long targetBlockStart = 0;
+      long targetBlockEnd = 0;
+      for (int i = 0; i < blocks.length; i++) {
+        long blocklen = blocks[i].getNumBytes();
+        targetBlockEnd = targetBlockStart + blocklen - 1;
+
+        if (target >= targetBlockStart && target <= targetBlockEnd) {
+          targetBlock = i;
+          break;
+        } else {
+          targetBlockStart = targetBlockEnd + 1;                    
         }
-    }
-
-    /** Utility class to encapsulate data node info and its ip address. */
-    private static class DNAddrPair {
-      DatanodeInfo info;
-      InetSocketAddress addr;
-      DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
-        this.info = info;
-        this.addr = addr;
       }
-    }
-        
-    /****************************************************************
-     * DFSInputStream provides bytes from a named file.  It handles 
-     * negotiation of the namenode and various datanodes as necessary.
-     ****************************************************************/
-    class DFSInputStream extends FSInputStream {
-        private Socket s = null;
-        boolean closed = false;
-
-        private String src;
-        private DataInputStream blockStream;
-        private Block blocks[] = null;
-        private DatanodeInfo nodes[][] = null;
-        private DatanodeInfo currentNode = null;
-        private Block currentBlock = null;
-        private long pos = 0;
-        private long filelen = 0;
-        private long blockEnd = -1;
-        private TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
-        
-        /**
-         */
-        public DFSInputStream(String src) throws IOException {
-            this.src = src;
-            openInfo();
-            this.blockStream = null;
-            for (int i = 0; i < blocks.length; i++) {
-                this.filelen += blocks[i].getNumBytes();
-            }
-        }
-
-        /**
-         * Grab the open-file info from namenode
-         */
-        synchronized void openInfo() throws IOException {
-            Block oldBlocks[] = this.blocks;
-
-            LocatedBlock results[] = namenode.open(src);            
-            Vector blockV = new Vector();
-            Vector nodeV = new Vector();
-            for (int i = 0; i < results.length; i++) {
-                blockV.add(results[i].getBlock());
-                nodeV.add(results[i].getLocations());
-            }
-            Block newBlocks[] = (Block[]) blockV.toArray(new Block[blockV.size()]);
+      if (targetBlock < 0) {
+        throw new IOException("Impossible situation: could not find target position " + target);
+      }
+      long offsetIntoBlock = target - targetBlockStart;
 
-            if (oldBlocks != null) {
-                for (int i = 0; i < oldBlocks.length; i++) {
-                    if (! oldBlocks[i].equals(newBlocks[i])) {
-                        throw new IOException("Blocklist for " + src + " has changed!");
-                    }
-                }
-                if (oldBlocks.length != newBlocks.length) {
-                    throw new IOException("Blocklist for " + src + " now has different length");
-                }
-            }
-            this.blocks = newBlocks;
-            this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);
-            this.currentNode = null;
-        }
+      //
+      // Connect to best DataNode for desired Block, with potential offset
+      //
+      DatanodeInfo chosenNode = null;
+      while (s == null) {
+        DNAddrPair retval = chooseDataNode(targetBlock);
+        chosenNode = retval.info;
+        InetSocketAddress targetAddr = retval.addr;
 
-        /**
-         * Returns the datanode from which the stream is currently reading.
-         */
-        public DatanodeInfo getCurrentDatanode() {
-          return currentNode;
-        }
+        try {
+          s = new Socket();
+          s.connect(targetAddr, READ_TIMEOUT);
+          s.setSoTimeout(READ_TIMEOUT);
 
-        /**
-         * Returns the block containing the target position. 
-         */
-        public Block getCurrentBlock() {
-          return currentBlock;
-        }
+          //
+          // Xmit header info to datanode
+          //
+          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+          out.write(OP_READSKIP_BLOCK);
+          blocks[targetBlock].write(out);
+          out.writeLong(offsetIntoBlock);
+          out.flush();
 
+          //
+          // Get bytes in block, set streams
+          //
+          DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+          long curBlockSize = in.readLong();
+          long amtSkipped = in.readLong();
+          if (curBlockSize != blocks[targetBlock].len) {
+            throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize);
+          }
+          if (amtSkipped != offsetIntoBlock) {
+            throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
+          }
 
-        /**
-         * Used by the automatic tests to detemine blocks locations of a
-         * file
-         */
-        synchronized DatanodeInfo[][] getDataNodes() {
-          return nodes;
+          this.pos = target;
+          this.blockEnd = targetBlockEnd;
+          this.currentBlock = blocks[targetBlock];
+          this.blockStream = in;
+          return chosenNode;
+        } catch (IOException ex) {
+          // Put chosen node into dead list, continue
+          LOG.debug("Failed to connect to " + targetAddr + ":" 
+                    + StringUtils.stringifyException(ex));
+          deadNodes.add(chosenNode);
+          if (s != null) {
+            try {
+              s.close();
+            } catch (IOException iex) {
+            }                        
+          }
+          s = null;
         }
+      }
+      return chosenNode;
+    }
 
-        /**
-         * Open a DataInputStream to a DataNode so that it can be read from.
-         * We get block ID and the IDs of the destinations at startup, from the namenode.
-         */
-        private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
-            if (target >= filelen) {
-                throw new IOException("Attempted to read past end of file");
-            }
-
-            if (s != null) {
-                s.close();
-                s = null;
-            }
-
-            //
-            // Compute desired block
-            //
-            int targetBlock = -1;
-            long targetBlockStart = 0;
-            long targetBlockEnd = 0;
-            for (int i = 0; i < blocks.length; i++) {
-                long blocklen = blocks[i].getNumBytes();
-                targetBlockEnd = targetBlockStart + blocklen - 1;
-
-                if (target >= targetBlockStart && target <= targetBlockEnd) {
-                    targetBlock = i;
-                    break;
-                } else {
-                    targetBlockStart = targetBlockEnd + 1;                    
-                }
-            }
-            if (targetBlock < 0) {
-                throw new IOException("Impossible situation: could not find target position " + target);
-            }
-            long offsetIntoBlock = target - targetBlockStart;
-
-            //
-            // Connect to best DataNode for desired Block, with potential offset
-            //
-            DatanodeInfo chosenNode = null;
-            while (s == null) {
-                DNAddrPair retval = chooseDataNode(targetBlock);
-                chosenNode = retval.info;
-                InetSocketAddress targetAddr = retval.addr;
-
-                try {
-                    s = new Socket();
-                    s.connect(targetAddr, READ_TIMEOUT);
-                    s.setSoTimeout(READ_TIMEOUT);
-
-                    //
-                    // Xmit header info to datanode
-                    //
-                    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-                    out.write(OP_READSKIP_BLOCK);
-                    blocks[targetBlock].write(out);
-                    out.writeLong(offsetIntoBlock);
-                    out.flush();
-
-                    //
-                    // Get bytes in block, set streams
-                    //
-                    DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-                    long curBlockSize = in.readLong();
-                    long amtSkipped = in.readLong();
-                    if (curBlockSize != blocks[targetBlock].len) {
-                        throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize);
-                    }
-                    if (amtSkipped != offsetIntoBlock) {
-                        throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped);
-                    }
-
-                    this.pos = target;
-                    this.blockEnd = targetBlockEnd;
-                    this.currentBlock = blocks[targetBlock];
-                    this.blockStream = in;
-                    return chosenNode;
-                } catch (IOException ex) {
-                    // Put chosen node into dead list, continue
-                    LOG.debug("Failed to connect to " + targetAddr + ":" 
-                              + StringUtils.stringifyException(ex));
-                    deadNodes.add(chosenNode);
-                    if (s != null) {
-                        try {
-                            s.close();
-                        } catch (IOException iex) {
-                        }                        
-                    }
-                    s = null;
-                }
-            }
-            return chosenNode;
-        }
+    /**
+     * Close it down!
+     */
+    public synchronized void close() throws IOException {
+      checkOpen();
+      if (closed) {
+        throw new IOException("Stream closed");
+      }
 
-        /**
-         * Close it down!
-         */
-        public synchronized void close() throws IOException {
-            checkOpen();
-            if (closed) {
-                throw new IOException("Stream closed");
-            }
+      if (s != null) {
+        blockStream.close();
+        s.close();
+        s = null;
+      }
+      super.close();
+      closed = true;
+    }
 
-            if (s != null) {
-                blockStream.close();
-                s.close();
-                s = null;
-            }
-            super.close();
-            closed = true;
+    /**
+     * Basic read()
+     */
+    public synchronized int read() throws IOException {
+      checkOpen();
+      if (closed) {
+        throw new IOException("Stream closed");
+      }
+      int result = -1;
+      if (pos < filelen) {
+        if (pos > blockEnd) {
+          currentNode = blockSeekTo(pos);
+        }
+        result = blockStream.read();
+        if (result >= 0) {
+          pos++;
         }
+      }
+      return result;
+    }
 
-        /**
-         * Basic read()
-         */
-        public synchronized int read() throws IOException {
-            checkOpen();
-            if (closed) {
-                throw new IOException("Stream closed");
+    /**
+     * Read the entire buffer.
+     */
+    public synchronized int read(byte buf[], int off, int len) throws IOException {
+      checkOpen();
+      if (closed) {
+        throw new IOException("Stream closed");
+      }
+      if (pos < filelen) {
+        int retries = 2;
+        while (retries > 0) {
+          try {
+            if (pos > blockEnd) {
+              currentNode = blockSeekTo(pos);
             }
-            int result = -1;
-            if (pos < filelen) {
-                if (pos > blockEnd) {
-                   currentNode = blockSeekTo(pos);
-                }
-                result = blockStream.read();
-                if (result >= 0) {
-                    pos++;
-                }
+            int realLen = Math.min(len, (int) (blockEnd - pos + 1));
+            int result = blockStream.read(buf, off, realLen);
+            if (result >= 0) {
+              pos += result;
             }
             return result;
-        }
-
-        /**
-         * Read the entire buffer.
-         */
-        public synchronized int read(byte buf[], int off, int len) throws IOException {
-            checkOpen();
-            if (closed) {
-                throw new IOException("Stream closed");
-            }
-            if (pos < filelen) {
-              int retries = 2;
-              while (retries > 0) {
-                try {
-                  if (pos > blockEnd) {
-                      currentNode = blockSeekTo(pos);
-                  }
-                  int realLen = Math.min(len, (int) (blockEnd - pos + 1));
-                  int result = blockStream.read(buf, off, realLen);
-                  if (result >= 0) {
-                      pos += result;
-                  }
-                  return result;
-                } catch (IOException e) {
-                  if (retries == 1) {
-                    LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
-                  }
-                  blockEnd = -1;
-                  if (currentNode != null) { deadNodes.add(currentNode); }
-                  if (--retries == 0) {
-                    throw e;
-                  }
-                }
-              }
+          } catch (IOException e) {
+            if (retries == 1) {
+              LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
+            }
+            blockEnd = -1;
+            if (currentNode != null) { deadNodes.add(currentNode); }
+            if (--retries == 0) {
+              throw e;
             }
-            return -1;
+          }
         }
+      }
+      return -1;
+    }
 
         
-        private DNAddrPair chooseDataNode(int blockId)
-        throws IOException {
-          int failures = 0;
-          while (true) {
-            try {
-              DatanodeInfo chosenNode = bestNode(nodes[blockId], deadNodes);
-              InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getName());
-              return new DNAddrPair(chosenNode, targetAddr);
-            } catch (IOException ie) {
-              String blockInfo =
-                  blocks[blockId]+" file="+src;
-              if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
-                throw new IOException("Could not obtain block: " + blockInfo);
-              }
-              if (nodes[blockId] == null || nodes[blockId].length == 0) {
-                LOG.info("No node available for block: " + blockInfo);
-              }
-              LOG.info("Could not obtain block " + blockId + " from any node:  " + ie);
-              try {
-                Thread.sleep(3000);
-              } catch (InterruptedException iex) {
-              }
-              deadNodes.clear(); //2nd option is to remove only nodes[blockId]
-              openInfo();
-              failures++;
-              continue;
-            }
+    private DNAddrPair chooseDataNode(int blockId)
+      throws IOException {
+      int failures = 0;
+      while (true) {
+        try {
+          DatanodeInfo chosenNode = bestNode(nodes[blockId], deadNodes);
+          InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getName());
+          return new DNAddrPair(chosenNode, targetAddr);
+        } catch (IOException ie) {
+          String blockInfo =
+            blocks[blockId]+" file="+src;
+          if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
+            throw new IOException("Could not obtain block: " + blockInfo);
+          }
+          if (nodes[blockId] == null || nodes[blockId].length == 0) {
+            LOG.info("No node available for block: " + blockInfo);
+          }
+          LOG.info("Could not obtain block " + blockId + " from any node:  " + ie);
+          try {
+            Thread.sleep(3000);
+          } catch (InterruptedException iex) {
           }
-        } 
+          deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+          openInfo();
+          failures++;
+          continue;
+        }
+      }
+    } 
         
-        private void fetchBlockByteRange(int blockId, long start,
-            long end, byte[] buf, int offset) throws IOException {
-          //
-          // Connect to best DataNode for desired Block, with potential offset
-          //
-          Socket dn = null;
-          while (dn == null) {
-            DNAddrPair retval = chooseDataNode(blockId);
-            DatanodeInfo chosenNode = retval.info;
-            InetSocketAddress targetAddr = retval.addr;
+    private void fetchBlockByteRange(int blockId, long start,
+                                     long end, byte[] buf, int offset) throws IOException {
+      //
+      // Connect to best DataNode for desired Block, with potential offset
+      //
+      Socket dn = null;
+      while (dn == null) {
+        DNAddrPair retval = chooseDataNode(blockId);
+        DatanodeInfo chosenNode = retval.info;
+        InetSocketAddress targetAddr = retval.addr;
             
-            try {
-              dn = new Socket();
-              dn.connect(targetAddr, READ_TIMEOUT);
-              dn.setSoTimeout(READ_TIMEOUT);
+        try {
+          dn = new Socket();
+          dn.connect(targetAddr, READ_TIMEOUT);
+          dn.setSoTimeout(READ_TIMEOUT);
               
-              //
-              // Xmit header info to datanode
-              //
-              DataOutputStream out = new DataOutputStream(new BufferedOutputStream(dn.getOutputStream()));
-              out.write(OP_READ_RANGE_BLOCK);
-              blocks[blockId].write(out);
-              out.writeLong(start);
-              out.writeLong(end);
-              out.flush();
+          //
+          // Xmit header info to datanode
+          //
+          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(dn.getOutputStream()));
+          out.write(OP_READ_RANGE_BLOCK);
+          blocks[blockId].write(out);
+          out.writeLong(start);
+          out.writeLong(end);
+          out.flush();
               
-              //
-              // Get bytes in block, set streams
-              //
-              DataInputStream in = new DataInputStream(new BufferedInputStream(dn.getInputStream()));
-              long curBlockSize = in.readLong();
-              long actualStart = in.readLong();
-              long actualEnd = in.readLong();
-              if (curBlockSize != blocks[blockId].len) {
-                throw new IOException("Recorded block size is " +
-                    blocks[blockId].len + ", but datanode reports size of " +
-                    curBlockSize);
-              }
-              if ((actualStart != start) || (actualEnd != end)) {
-                throw new IOException("Asked for byte range  " + start +
-                    "-" + end + ", but only received range " + actualStart +
-                    "-" + actualEnd);
-              }
-              int nread = in.read(buf, offset, (int)(end - start + 1));
-            } catch (IOException ex) {
-              // Put chosen node into dead list, continue
-              LOG.debug("Failed to connect to " + targetAddr + ":" 
-                        + StringUtils.stringifyException(ex));
-              deadNodes.add(chosenNode);
-              if (dn != null) {
-                try {
-                  dn.close();
-                } catch (IOException iex) {
-                }
-              }
-              dn = null;
+          //
+          // Get bytes in block, set streams
+          //
+          DataInputStream in = new DataInputStream(new BufferedInputStream(dn.getInputStream()));
+          long curBlockSize = in.readLong();
+          long actualStart = in.readLong();
+          long actualEnd = in.readLong();
+          if (curBlockSize != blocks[blockId].len) {
+            throw new IOException("Recorded block size is " +
+                                  blocks[blockId].len + ", but datanode reports size of " +
+                                  curBlockSize);
+          }
+          if ((actualStart != start) || (actualEnd != end)) {
+            throw new IOException("Asked for byte range  " + start +
+                                  "-" + end + ", but only received range " + actualStart +
+                                  "-" + actualEnd);
+          }
+          int nread = in.read(buf, offset, (int)(end - start + 1));
+        } catch (IOException ex) {
+          // Put chosen node into dead list, continue
+          LOG.debug("Failed to connect to " + targetAddr + ":" 
+                    + StringUtils.stringifyException(ex));
+          deadNodes.add(chosenNode);
+          if (dn != null) {
+            try {
+              dn.close();
+            } catch (IOException iex) {
             }
           }
+          dn = null;
         }
+      }
+    }
         
-        public int read(long position, byte[] buf, int off, int len)
-        throws IOException {
-          // sanity checks
-          checkOpen();
-          if (closed) {
-            throw new IOException("Stream closed");
-          }
-          if ((position < 0) || (position > filelen)) {
-            return -1;
-          }
-          int realLen = len;
-          if ((position + len) > filelen) {
-            realLen = (int)(filelen - position);
-          }
-          // determine the block and byte range within the block
-          // corresponding to position and realLen
-          int targetBlock = -1;
-          long targetStart = 0;
-          long targetEnd = 0;
-          for (int idx = 0; idx < blocks.length; idx++) {
-            long blocklen = blocks[idx].getNumBytes();
-            targetEnd = targetStart + blocklen - 1;
-            if (position >= targetStart && position <= targetEnd) {
-              targetBlock = idx;
-              targetStart = position - targetStart;
-              targetEnd = Math.min(blocklen, targetStart + realLen) - 1;
-              realLen = (int)(targetEnd - targetStart + 1);
-              break;
-            }
-            targetStart += blocklen;
-          }
-          if (targetBlock < 0) {
-            throw new IOException(
-                "Impossible situation: could not find target position "+
-                position);
-          }
-          fetchBlockByteRange(targetBlock, targetStart, targetEnd, buf, off);
-          return realLen;
+    public int read(long position, byte[] buf, int off, int len)
+      throws IOException {
+      // sanity checks
+      checkOpen();
+      if (closed) {
+        throw new IOException("Stream closed");
+      }
+      if ((position < 0) || (position > filelen)) {
+        return -1;
+      }
+      int realLen = len;
+      if ((position + len) > filelen) {
+        realLen = (int)(filelen - position);
+      }
+      // determine the block and byte range within the block
+      // corresponding to position and realLen
+      int targetBlock = -1;
+      long targetStart = 0;
+      long targetEnd = 0;
+      for (int idx = 0; idx < blocks.length; idx++) {
+        long blocklen = blocks[idx].getNumBytes();
+        targetEnd = targetStart + blocklen - 1;
+        if (position >= targetStart && position <= targetEnd) {
+          targetBlock = idx;
+          targetStart = position - targetStart;
+          targetEnd = Math.min(blocklen, targetStart + realLen) - 1;
+          realLen = (int)(targetEnd - targetStart + 1);
+          break;
         }
+        targetStart += blocklen;
+      }
+      if (targetBlock < 0) {
+        throw new IOException(
+                              "Impossible situation: could not find target position "+
+                              position);
+      }
+      fetchBlockByteRange(targetBlock, targetStart, targetEnd, buf, off);
+      return realLen;
+    }
         
-        /**
-         * Seek to a new arbitrary location
-         */
-        public synchronized void seek(long targetPos) throws IOException {
-            if (targetPos > filelen) {
-                throw new IOException("Cannot seek after EOF");
-            }
-            boolean done = false;
-            if (pos <= targetPos && targetPos <= blockEnd) {
-                //
-                // If this seek is to a positive position in the current
-                // block, and this piece of data might already be lying in
-                // the TCP buffer, then just eat up the intervening data.
-                //
-                int diff = (int)(targetPos - pos);
-                if (diff <= TCP_WINDOW_SIZE) {
-                  blockStream.skipBytes(diff);
-                  pos += diff;
-                  assert(pos == targetPos);
-                  done = true;
-                }
-            }
-            if (!done) {
-                pos = targetPos;
-                blockEnd = -1;
-            }
+    /**
+     * Seek to a new arbitrary location
+     */
+    public synchronized void seek(long targetPos) throws IOException {
+      if (targetPos > filelen) {
+        throw new IOException("Cannot seek after EOF");
+      }
+      boolean done = false;
+      if (pos <= targetPos && targetPos <= blockEnd) {
+        //
+        // If this seek is to a positive position in the current
+        // block, and this piece of data might already be lying in
+        // the TCP buffer, then just eat up the intervening data.
+        //
+        int diff = (int)(targetPos - pos);
+        if (diff <= TCP_WINDOW_SIZE) {
+          blockStream.skipBytes(diff);
+          pos += diff;
+          assert(pos == targetPos);
+          done = true;
         }
+      }
+      if (!done) {
+        pos = targetPos;
+        blockEnd = -1;
+      }
+    }
 
-        /**
-         * Seek to given position on a node other than the current node.  If
-         * a node other than the current node is found, then returns true. 
-         * If another node could not be found, then returns false.
-         */
-        public synchronized boolean seekToNewSource(long targetPos) throws IOException {
-            boolean markedDead = deadNodes.contains(currentNode);
-            deadNodes.add(currentNode);
-            DatanodeInfo oldNode = currentNode;
-            DatanodeInfo newNode = blockSeekTo(targetPos);
-            if ( !markedDead ) {
-                /* remove it from deadNodes. blockSeekTo could have cleared 
-                 * deadNodes and added currentNode again. Thats ok. */
-                deadNodes.remove(oldNode);
-            }
-            if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
-                currentNode = newNode;
-                return true;
-            } else {
-                return false;
-            }
-        }
+    /**
+     * Seek to given position on a node other than the current node.  If
+     * a node other than the current node is found, then returns true. 
+     * If another node could not be found, then returns false.
+     */
+    public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+      boolean markedDead = deadNodes.contains(currentNode);
+      deadNodes.add(currentNode);
+      DatanodeInfo oldNode = currentNode;
+      DatanodeInfo newNode = blockSeekTo(targetPos);
+      if ( !markedDead ) {
+        /* remove it from deadNodes. blockSeekTo could have cleared 
+         * deadNodes and added currentNode again. Thats ok. */
+        deadNodes.remove(oldNode);
+      }
+      if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
+        currentNode = newNode;
+        return true;
+      } else {
+        return false;
+      }
+    }
         
-        /**
-         */
-        public synchronized long getPos() throws IOException {
-            return pos;
-        }
+    /**
+     */
+    public synchronized long getPos() throws IOException {
+      return pos;
+    }
 
-        /**
-         */
-        public synchronized int available() throws IOException {
-            if (closed) {
-                throw new IOException("Stream closed");
-            }
-            return (int) (filelen - pos);
-        }
+    /**
+     */
+    public synchronized int available() throws IOException {
+      if (closed) {
+        throw new IOException("Stream closed");
+      }
+      return (int) (filelen - pos);
+    }
 
-        /**
-         * We definitely don't support marks
-         */
-        public boolean markSupported() {
-            return false;
-        }
-        public void mark(int readLimit) {
-        }
-        public void reset() throws IOException {
-            throw new IOException("Mark not supported");
-        }
+    /**
+     * We definitely don't support marks
+     */
+    public boolean markSupported() {
+      return false;
+    }
+    public void mark(int readLimit) {
     }
+    public void reset() throws IOException {
+      throw new IOException("Mark not supported");
+    }
+  }
     
-    static class DFSDataInputStream extends FSDataInputStream {
-      DFSDataInputStream(DFSInputStream in, Configuration conf)
+  static class DFSDataInputStream extends FSDataInputStream {
+    DFSDataInputStream(DFSInputStream in, Configuration conf)
       throws IOException {
-        super(in, conf);
-      }
+      super(in, conf);
+    }
       
-      DFSDataInputStream(DFSInputStream in, int bufferSize) throws IOException {
-        super(in, bufferSize);
-      }
+    DFSDataInputStream(DFSInputStream in, int bufferSize) throws IOException {
+      super(in, bufferSize);
+    }
       
-      /**
-       * Returns the datanode from which the stream is currently reading.
-       */
-      public DatanodeInfo getCurrentDatanode() {
-        return ((DFSInputStream)inStream).getCurrentDatanode();
-      }
+    /**
+     * Returns the datanode from which the stream is currently reading.
+     */
+    public DatanodeInfo getCurrentDatanode() {
+      return ((DFSInputStream)inStream).getCurrentDatanode();
+    }
       
-      /**
-       * Returns the block containing the target position. 
-       */
-      public Block getCurrentBlock() {
-        return ((DFSInputStream)inStream).getCurrentBlock();
-      }
-
-      /**
-       * Used by the automatic tests to detemine blocks locations of a
-       * file
-       */
-      synchronized DatanodeInfo[][] getDataNodes() {
-        return ((DFSInputStream)inStream).getDataNodes();
+    /**
+     * Returns the block containing the target position. 
+     */
+    public Block getCurrentBlock() {
+      return ((DFSInputStream)inStream).getCurrentBlock();
+    }
+
+    /**
+     * Used by the automatic tests to detemine blocks locations of a
+     * file
+     */
+    synchronized DatanodeInfo[][] getDataNodes() {
+      return ((DFSInputStream)inStream).getDataNodes();
+    }
+
+  }
+
+  /****************************************************************
+   * DFSOutputStream creates files from a stream of bytes.
+   ****************************************************************/
+  class DFSOutputStream extends OutputStream {
+    private Socket s;
+    boolean closed = false;
+
+    private byte outBuf[] = new byte[BUFFER_SIZE];
+    private int pos = 0;
+
+    private UTF8 src;
+    private boolean overwrite;
+    private short replication;
+    private boolean firstTime = true;
+    private DataOutputStream blockStream;
+    private DataInputStream blockReplyStream;
+    private File backupFile;
+    private OutputStream backupStream;
+    private Block block;
+    private long filePos = 0;
+    private int bytesWrittenToBlock = 0;
+    private String datanodeName;
+    private long blockSize;
+
+    private Progressable progress;
+    /**
+     * Create a new output stream to the given DataNode.
+     */
+    public DFSOutputStream(UTF8 src, boolean overwrite, 
+                           short replication, long blockSize,
+                           Progressable progress
+                           ) throws IOException {
+      this.src = src;
+      this.overwrite = overwrite;
+      this.replication = replication;
+      this.backupFile = newBackupFile();
+      this.blockSize = blockSize;
+      this.backupStream = new FileOutputStream(backupFile);
+      this.progress = progress;
+      if (progress != null) {
+        LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
       }
+    }
 
+    /* Wrapper for closing backupStream. This sets backupStream to null so
+     * that we do not attempt to write to backupStream that could be
+     * invalid in subsequent writes. Otherwise we might end trying to write
+     * filedescriptor that we don't own.
+     */
+    private void closeBackupStream() throws IOException {
+      if ( backupStream != null ) {
+        OutputStream stream = backupStream;
+        backupStream = null;
+        stream.close();
+      }   
+    }
+    /* Similar to closeBackupStream(). Theoritically deleting a file
+     * twice could result in deleting a file that we should not.
+     */
+    private void deleteBackupFile() {
+      if ( backupFile != null ) {
+        File file = backupFile;
+        backupFile = null;
+        file.delete();
+      }
+    }
+        
+    private File newBackupFile() throws IOException {
+      File result = conf.getFile("dfs.client.buffer.dir",
+                                 "tmp"+File.separator+
+                                 "client-"+Math.abs(r.nextLong()));
+      result.deleteOnExit();
+      return result;
     }
 
-    /****************************************************************
-     * DFSOutputStream creates files from a stream of bytes.
-     ****************************************************************/
-    class DFSOutputStream extends OutputStream {
-        private Socket s;
-        boolean closed = false;
-
-        private byte outBuf[] = new byte[BUFFER_SIZE];
-        private int pos = 0;
-
-        private UTF8 src;
-        private boolean overwrite;
-        private short replication;
-        private boolean firstTime = true;
-        private DataOutputStream blockStream;
-        private DataInputStream blockReplyStream;
-        private File backupFile;
-        private OutputStream backupStream;
-        private Block block;
-        private long filePos = 0;
-        private int bytesWrittenToBlock = 0;
-        private String datanodeName;
-        private long blockSize;
-
-        private Progressable progress;
-        /**
-         * Create a new output stream to the given DataNode.
-         */
-        public DFSOutputStream(UTF8 src, boolean overwrite, 
-                               short replication, long blockSize,
-                               Progressable progress
-                               ) throws IOException {
-            this.src = src;
-            this.overwrite = overwrite;
-            this.replication = replication;
-            this.backupFile = newBackupFile();
-            this.blockSize = blockSize;
-            this.backupStream = new FileOutputStream(backupFile);
-            this.progress = progress;
-            if (progress != null) {
-                LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
-            }
+    /**
+     * Open a DataOutputStream to a DataNode so that it can be written to.
+     * This happens when a file is created and each time a new block is allocated.
+     * Must get block ID and the IDs of the destinations from the namenode.
+     */
+    private synchronized void nextBlockOutputStream() throws IOException {
+      boolean retry = false;
+      long startTime = System.currentTimeMillis();
+      do {
+        retry = false;
+                
+        LocatedBlock lb;
+        if (firstTime) {
+          lb = locateNewBlock();
+        } else {
+          lb = locateFollowingBlock(startTime);
+        }
+
+        block = lb.getBlock();
+        if ( block.getNumBytes() < bytesWrittenToBlock ) {
+          block.setNumBytes( bytesWrittenToBlock );
         }
+        DatanodeInfo nodes[] = lb.getLocations();
 
-        /* Wrapper for closing backupStream. This sets backupStream to null so
-         * that we do not attempt to write to backupStream that could be
-         * invalid in subsequent writes. Otherwise we might end trying to write
-         * filedescriptor that we don't own.
-         */
-        private void closeBackupStream() throws IOException {
-          if ( backupStream != null ) {
-            OutputStream stream = backupStream;
-            backupStream = null;
-            stream.close();
-          }   
-        }
-        /* Similar to closeBackupStream(). Theoritically deleting a file
-         * twice could result in deleting a file that we should not.
-         */
-        private void deleteBackupFile() {
-          if ( backupFile != null ) {
-            File file = backupFile;
-            backupFile = null;
-            file.delete();
+        //
+        // Connect to first DataNode in the list.  Abort if this fails.
+        //
+        InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName());
+        try {
+          s = new Socket();
+          s.connect(target, READ_TIMEOUT);
+          s.setSoTimeout(replication * READ_TIMEOUT);
+          datanodeName = nodes[0].getName();
+        } catch (IOException ie) {
+          // Connection failed.  Let's wait a little bit and retry
+          try {
+            if (System.currentTimeMillis() - startTime > 5000) {
+              LOG.info("Waiting to find target node: " + target);
+            }
+            Thread.sleep(6000);
+          } catch (InterruptedException iex) {
+          }
+          if (firstTime) {
+            namenode.abandonFileInProgress(src.toString(), 
+                                           clientName);
+          } else {
+            namenode.abandonBlock(block, src.toString());
           }
+          retry = true;
+          continue;
         }
-        
-        private File newBackupFile() throws IOException {
-          File result = conf.getFile("dfs.client.buffer.dir",
-                                     "tmp"+File.separator+
-                                     "client-"+Math.abs(r.nextLong()));
-          result.deleteOnExit();
-          return result;
-        }
-
-        /**
-         * Open a DataOutputStream to a DataNode so that it can be written to.
-         * This happens when a file is created and each time a new block is allocated.
-         * Must get block ID and the IDs of the destinations from the namenode.
-         */
-        private synchronized void nextBlockOutputStream() throws IOException {
-            boolean retry = false;
-            long startTime = System.currentTimeMillis();
-            do {
-                retry = false;
-                
-                LocatedBlock lb;
-                if (firstTime) {
-                  lb = locateNewBlock();
-                } else {
-                  lb = locateFollowingBlock(startTime);
-                }
-
-                block = lb.getBlock();
-                if ( block.getNumBytes() < bytesWrittenToBlock ) {
-                  block.setNumBytes( bytesWrittenToBlock );
-                }
-                DatanodeInfo nodes[] = lb.getLocations();
-
-                //
-                // Connect to first DataNode in the list.  Abort if this fails.
-                //
-                InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName());
-                try {
-                    s = new Socket();
-                    s.connect(target, READ_TIMEOUT);
-                    s.setSoTimeout(replication * READ_TIMEOUT);
-                    datanodeName = nodes[0].getName();
-                } catch (IOException ie) {
-                    // Connection failed.  Let's wait a little bit and retry
-                    try {
-                        if (System.currentTimeMillis() - startTime > 5000) {
-                            LOG.info("Waiting to find target node: " + target);
-                        }
-                        Thread.sleep(6000);
-                    } catch (InterruptedException iex) {
-                    }
-                    if (firstTime) {
-                        namenode.abandonFileInProgress(src.toString(), 
-                                                       clientName);
-                    } else {
-                        namenode.abandonBlock(block, src.toString());
-                    }
-                    retry = true;
-                    continue;
-                }
-
-                //
-                // Xmit header info to datanode
-                //
-                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-                out.write(OP_WRITE_BLOCK);
-                out.writeBoolean(true);
-                block.write(out);
-                out.writeInt(nodes.length);
-                for (int i = 0; i < nodes.length; i++) {
-                    nodes[i].write(out);
-                }
-                out.write(CHUNKED_ENCODING);
-                blockStream = out;
-                blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-            } while (retry);
-            firstTime = false;
-        }
-
-        private LocatedBlock locateNewBlock() throws IOException {     
-          int retries = 3;
-          while (true) {
-            while (true) {
+
+        //
+        // Xmit header info to datanode
+        //
+        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
+        out.write(OP_WRITE_BLOCK);
+        out.writeBoolean(true);
+        block.write(out);
+        out.writeInt(nodes.length);
+        for (int i = 0; i < nodes.length; i++) {
+          nodes[i].write(out);
+        }
+        out.write(CHUNKED_ENCODING);
+        blockStream = out;
+        blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+      } while (retry);
+      firstTime = false;
+    }
+
+    private LocatedBlock locateNewBlock() throws IOException {     
+      int retries = 3;
+      while (true) {
+        while (true) {
+          try {
+            return namenode.create(src.toString(), clientName.toString(),
+                                   overwrite, replication, blockSize);
+          } catch (RemoteException e) {
+            if (--retries == 0 || 
+                !AlreadyBeingCreatedException.class.getName().
+                equals(e.getClassName())) {
+              throw e;
+            } else {
+              // because failed tasks take upto LEASE_PERIOD to
+              // release their pendingCreates files, if the file
+              // we want to create is already being created, 
+              // wait and try again.
+              LOG.info(StringUtils.stringifyException(e));
               try {
-                return namenode.create(src.toString(), clientName.toString(),
-                                       overwrite, replication, blockSize);
-              } catch (RemoteException e) {
-                if (--retries == 0 || 
-                    !AlreadyBeingCreatedException.class.getName().
-                        equals(e.getClassName())) {
-                  throw e;
-                } else {
-                  // because failed tasks take upto LEASE_PERIOD to
-                  // release their pendingCreates files, if the file
-                  // we want to create is already being created, 
-                  // wait and try again.
-                  LOG.info(StringUtils.stringifyException(e));
-                  try {
-                    Thread.sleep(LEASE_SOFTLIMIT_PERIOD);
-                  } catch (InterruptedException ie) {
-                  }
-                }
+                Thread.sleep(LEASE_SOFTLIMIT_PERIOD);
+              } catch (InterruptedException ie) {
               }
             }
           }
         }
+      }
+    }
         
-        private LocatedBlock locateFollowingBlock(long start
-                                                  ) throws IOException {     
-          int retries = 5;
-          long sleeptime = 400;
-          while (true) {
-            long localstart = System.currentTimeMillis();
-            while (true) {
+    private LocatedBlock locateFollowingBlock(long start
+                                              ) throws IOException {     
+      int retries = 5;
+      long sleeptime = 400;
+      while (true) {
+        long localstart = System.currentTimeMillis();
+        while (true) {
+          try {
+            return namenode.addBlock(src.toString(), 
+                                     clientName.toString());
+          } catch (RemoteException e) {
+            if (--retries == 0 || 
+                !NotReplicatedYetException.class.getName().
+                equals(e.getClassName())) {
+              throw e;
+            } else {
+              LOG.info(StringUtils.stringifyException(e));
+              if (System.currentTimeMillis() - localstart > 5000) {
+                LOG.info("Waiting for replication for " + 
+                         (System.currentTimeMillis() - localstart)/1000 + 
+                         " seconds");
+              }
               try {
-                return namenode.addBlock(src.toString(), 
-                                         clientName.toString());
-              } catch (RemoteException e) {
-                if (--retries == 0 || 
-                    !NotReplicatedYetException.class.getName().
-                        equals(e.getClassName())) {
-                  throw e;
-                } else {
-                  LOG.info(StringUtils.stringifyException(e));
-                  if (System.currentTimeMillis() - localstart > 5000) {
-                    LOG.info("Waiting for replication for " + 
-                             (System.currentTimeMillis() - localstart)/1000 + 
-                             " seconds");
-                  }
-                  try {
-                    LOG.debug("NotReplicatedYetException sleeping " + src +
-                              " retries left " + retries);
-                    Thread.sleep(sleeptime);
-                  } catch (InterruptedException ie) {
-                  }
-                }                
+                LOG.debug("NotReplicatedYetException sleeping " + src +
+                          " retries left " + retries);
+                Thread.sleep(sleeptime);
+              } catch (InterruptedException ie) {
               }
-            }
-          } 
+            }                
+          }
         }
+      } 
+    }
 
-        /**
-         * We're referring to the file pos here
-         */
-        public synchronized long getPos() throws IOException {
-            return filePos;
-        }
+    /**
+     * We're referring to the file pos here
+     */
+    public synchronized long getPos() throws IOException {
+      return filePos;
+    }
 			
-        /**
-         * Writes the specified byte to this output stream.
-         */
-        public synchronized void write(int b) throws IOException {
-            checkOpen();
-            if (closed) {
-                throw new IOException("Stream closed");
-            }
+    /**
+     * Writes the specified byte to this output stream.
+     */
+    public synchronized void write(int b) throws IOException {
+      checkOpen();
+      if (closed) {
+        throw new IOException("Stream closed");
+      }
 
-            if ((bytesWrittenToBlock + pos == blockSize) ||
-                (pos >= BUFFER_SIZE)) {
-                flush();
-            }
-            outBuf[pos++] = (byte) b;
-            filePos++;
-        }
+      if ((bytesWrittenToBlock + pos == blockSize) ||
+          (pos >= BUFFER_SIZE)) {
+        flush();
+      }
+      outBuf[pos++] = (byte) b;
+      filePos++;
+    }
 
-        /**
-         * Writes the specified bytes to this output stream.
-         */
-      public synchronized void write(byte b[], int off, int len)
-        throws IOException {
-            checkOpen();
-            if (closed) {
-                throw new IOException("Stream closed");
-            }
-            while (len > 0) {
-              int remaining = Math.min(BUFFER_SIZE - pos,
-                  (int)((blockSize - bytesWrittenToBlock) - pos));
-              int toWrite = Math.min(remaining, len);
-              System.arraycopy(b, off, outBuf, pos, toWrite);
-              pos += toWrite;
-              off += toWrite;
-              len -= toWrite;
-              filePos += toWrite;
-
-              if ((bytesWrittenToBlock + pos >= blockSize) ||
-                  (pos == BUFFER_SIZE)) {
-                flush();
-              }
-            }
+    /**
+     * Writes the specified bytes to this output stream.
+     */
+    public synchronized void write(byte b[], int off, int len)
+      throws IOException {
+      checkOpen();
+      if (closed) {
+        throw new IOException("Stream closed");
+      }
+      while (len > 0) {
+        int remaining = Math.min(BUFFER_SIZE - pos,
+                                 (int)((blockSize - bytesWrittenToBlock) - pos));
+        int toWrite = Math.min(remaining, len);
+        System.arraycopy(b, off, outBuf, pos, toWrite);
+        pos += toWrite;
+        off += toWrite;
+        len -= toWrite;
+        filePos += toWrite;
+
+        if ((bytesWrittenToBlock + pos >= blockSize) ||
+            (pos == BUFFER_SIZE)) {
+          flush();
         }
+      }
+    }
 
-        /**
-         * Flush the buffer, getting a stream to a new block if necessary.
-         */
-        public synchronized void flush() throws IOException {
-            checkOpen();
-            if (closed) {
-                throw new IOException("Stream closed");
-            }
+    /**
+     * Flush the buffer, getting a stream to a new block if necessary.
+     */
+    public synchronized void flush() throws IOException {
+      checkOpen();
+      if (closed) {
+        throw new IOException("Stream closed");
+      }
 
-            if (bytesWrittenToBlock + pos >= blockSize) {
-                flushData((int) blockSize - bytesWrittenToBlock);
-            }
-            if (bytesWrittenToBlock == blockSize) {
-                endBlock();
-            }
-            flushData(pos);
-        }
+      if (bytesWrittenToBlock + pos >= blockSize) {
+        flushData((int) blockSize - bytesWrittenToBlock);
+      }
+      if (bytesWrittenToBlock == blockSize) {
+        endBlock();
+      }
+      flushData(pos);
+    }
 
-        /**
-         * Actually flush the accumulated bytes to the remote node,
-         * but no more bytes than the indicated number.
-         */
-        private synchronized void flushData(int maxPos) throws IOException {
-            int workingPos = Math.min(pos, maxPos);
+    /**
+     * Actually flush the accumulated bytes to the remote node,
+     * but no more bytes than the indicated number.
+     */
+    private synchronized void flushData(int maxPos) throws IOException {
+      int workingPos = Math.min(pos, maxPos);
             
-            if (workingPos > 0) {
-                if ( backupStream == null ) {
-                    throw new IOException( "Trying to write to backupStream " +
-                                           "but it already closed or not open");
-                }
-                //
-                // To the local block backup, write just the bytes
-                //
-                backupStream.write(outBuf, 0, workingPos);
-
-                //
-                // Track position
-                //
-                bytesWrittenToBlock += workingPos;
-                System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
-                pos -= workingPos;
-            }
-        }
-
-        /**
-         * We're done writing to the current block.
-         */
-        private synchronized void endBlock() throws IOException {
-            long sleeptime = 400;
-            //
-            // Done with local copy
-            //
-            closeBackupStream();
-
-            //
-            // Send it to datanode
-            //
-            boolean sentOk = false;
-            int remainingAttempts = 
-               conf.getInt("dfs.client.block.write.retries", 3);
-            while (!sentOk) {
-                nextBlockOutputStream();
-                InputStream in = new FileInputStream(backupFile);
-                try {
-                    byte buf[] = new byte[BUFFER_SIZE];
-                    int bytesRead = in.read(buf);
-                    while (bytesRead > 0) {
-                        blockStream.writeLong((long) bytesRead);
-                        blockStream.write(buf, 0, bytesRead);
-                        if (progress != null) { progress.progress(); }
-                        bytesRead = in.read(buf);
-                    }
-                    internalClose();
-                    sentOk = true;
-                } catch (IOException ie) {
-                    handleSocketException(ie);
-                    remainingAttempts -= 1;
-                    if (remainingAttempts == 0) {
-                      throw ie;
-                    }
-                    try {
-                      Thread.sleep(sleeptime);
-                    } catch (InterruptedException e) {
-                    }
-                } finally {
-                  in.close();
-                }
-            }
+      if (workingPos > 0) {
+        if ( backupStream == null ) {
+          throw new IOException( "Trying to write to backupStream " +
+                                 "but it already closed or not open");
+        }
+        //
+        // To the local block backup, write just the bytes
+        //
+        backupStream.write(outBuf, 0, workingPos);
+
+        //
+        // Track position
+        //
+        bytesWrittenToBlock += workingPos;
+        System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
+        pos -= workingPos;
+      }
+    }
 
-            bytesWrittenToBlock = 0;
-            //
-            // Delete local backup, start new one
-            //
-            deleteBackupFile();
-            File tmpFile = newBackupFile();
-            bytesWrittenToBlock = 0;
-            backupStream = new FileOutputStream(tmpFile);
-            backupFile = tmpFile;
+    /**
+     * We're done writing to the current block.
+     */
+    private synchronized void endBlock() throws IOException {
+      long sleeptime = 400;
+      //
+      // Done with local copy
+      //
+      closeBackupStream();
+
+      //
+      // Send it to datanode
+      //
+      boolean sentOk = false;
+      int remainingAttempts = 
+        conf.getInt("dfs.client.block.write.retries", 3);
+      while (!sentOk) {
+        nextBlockOutputStream();
+        InputStream in = new FileInputStream(backupFile);
+        try {
+          byte buf[] = new byte[BUFFER_SIZE];
+          int bytesRead = in.read(buf);
+          while (bytesRead > 0) {
+            blockStream.writeLong((long) bytesRead);
+            blockStream.write(buf, 0, bytesRead);
+            if (progress != null) { progress.progress(); }
+            bytesRead = in.read(buf);
+          }
+          internalClose();
+          sentOk = true;
+        } catch (IOException ie) {
+          handleSocketException(ie);
+          remainingAttempts -= 1;
+          if (remainingAttempts == 0) {
+            throw ie;
+          }
+          try {
+            Thread.sleep(sleeptime);
+          } catch (InterruptedException e) {
+          }
+        } finally {
+          in.close();
         }
+      }
 
-        /**
-         * Close down stream to remote datanode.
-         */
-        private synchronized void internalClose() throws IOException {
-          try {
-            blockStream.writeLong(0);
-            blockStream.flush();
+      bytesWrittenToBlock = 0;
+      //
+      // Delete local backup, start new one
+      //
+      deleteBackupFile();
+      File tmpFile = newBackupFile();
+      bytesWrittenToBlock = 0;
+      backupStream = new FileOutputStream(tmpFile);
+      backupFile = tmpFile;
+    }
 
-            long complete = blockReplyStream.readLong();
-            if (complete != WRITE_COMPLETE) {
-                LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
-                throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
-            }
-          } catch (IOException ie) {
-            throw (IOException)
-                  new IOException("failure closing block of file " +
-                                  src.toString() + " to node " +
-                                  (datanodeName == null ? "?" : datanodeName)
-                                 ).initCause(ie);
-          }
+    /**
+     * Close down stream to remote datanode.
+     */
+    private synchronized void internalClose() throws IOException {
+      try {
+        blockStream.writeLong(0);
+        blockStream.flush();
+
+        long complete = blockReplyStream.readLong();
+        if (complete != WRITE_COMPLETE) {
+          LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
+          throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
+        }
+      } catch (IOException ie) {
+        throw (IOException)
+          new IOException("failure closing block of file " +
+                          src.toString() + " to node " +
+                          (datanodeName == null ? "?" : datanodeName)
+                          ).initCause(ie);
+      }
                     
-            LocatedBlock lb = new LocatedBlock();
-            lb.readFields(blockReplyStream);
+      LocatedBlock lb = new LocatedBlock();

[... 130 lines stripped ...]


Mime
View raw message