hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1415809 - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/ja...
Date Fri, 30 Nov 2012 19:53:04 GMT
Author: szetszwo
Date: Fri Nov 30 19:52:49 2012
New Revision: 1415809

URL: http://svn.apache.org/viewvc?rev=1415809&view=rev
Log:
Merge r1414455 through r1415803 from trunk.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
      - copied unchanged from r1415803, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
      - copied unchanged from r1415803, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsBinaryLoader.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1414455-1415803

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Nov 30 19:52:49 2012
@@ -17,6 +17,9 @@ Trunk (Unreleased)
     reliably storing HDFS edit logs. See dedicated section below for breakdown
     of subtasks.
 
+    HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup.
+    (Junping Du via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
@@ -386,6 +389,9 @@ Release 2.0.3-alpha - Unreleased 
 
     HDFS-4155. libhdfs implementation of hsync API (Liang Xie via todd)
 
+    HDFS-4213. Add an API to hsync for updating the last block length at the
+    namenode. (Jing Zhao via szetszwo)
+
   IMPROVEMENTS
   
     HDFS-3925. Prettify PipelineAck#toString() for printing to a log
@@ -484,6 +490,9 @@ Release 2.0.3-alpha - Unreleased 
     HDFS-4038. Override toString() for BookKeeperEditLogInputStream.
     (Vinay via umamahesh)
 
+    HDFS-4214. OfflineEditsViewer should print out the offset at which it
+    encountered an error. (Colin Patrick McCabe via atm)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -633,6 +642,10 @@ Release 2.0.3-alpha - Unreleased 
     HDFS-4216. Do not ignore QuotaExceededException when adding symlinks.
     (szetszwo)
 
+    HDFS-4242. Map.Entry is incorrectly used in LeaseManager since the behavior
+    of it is undefined after the iteration or modifications of the map.
+    (szetszwo)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1414455-1415803

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Nov 30 19:52:49 2012
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -1489,9 +1490,14 @@ public class DFSOutputStream extends FSO
    */
   @Override
   public void hflush() throws IOException {
-    flushOrSync(false);
+    flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
   }
 
+  @Override
+  public void hsync() throws IOException {
+    hsync(EnumSet.noneOf(SyncFlag.class));
+  }
+  
   /**
    * The expected semantics is all data have flushed out to all replicas 
    * and all replicas have done posix fsync equivalent - ie the OS has 
@@ -1500,17 +1506,35 @@ public class DFSOutputStream extends FSO
    * Note that only the current block is flushed to the disk device.
    * To guarantee durable sync across block boundaries the stream should
    * be created with {@link CreateFlag#SYNC_BLOCK}.
+   * 
+   * @param syncFlags
+   *          Indicate the semantic of the sync. Currently used to specify
+   *          whether or not to update the block length in NameNode.
    */
-  @Override
-  public void hsync() throws IOException {
-    flushOrSync(true);
+  public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
+    flushOrSync(true, syncFlags);
   }
 
-  private void flushOrSync(boolean isSync) throws IOException {
+  /**
+   * Flush/Sync buffered data to DataNodes.
+   * 
+   * @param isSync
+   *          Whether or not to require all replicas to flush data to the disk
+   *          device
+   * @param syncFlags
+   *          Indicate extra detailed semantic of the flush/sync. Currently
+   *          mainly used to specify whether or not to update the file length in
+   *          the NameNode
+   * @throws IOException
+   */
+  private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
+      throws IOException {
     dfsClient.checkOpen();
     isClosed();
     try {
       long toWaitFor;
+      long lastBlockLength = -1L;
+      boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
       synchronized (this) {
         /* Record current blockOffset. This might be changed inside
          * flushBuffer() where a partial checksum chunk might be flushed.
@@ -1574,13 +1598,20 @@ public class DFSOutputStream extends FSO
       } // end synchronized
 
       waitForAckedSeqno(toWaitFor);
-
-      // If any new blocks were allocated since the last flush, 
-      // then persist block locations on namenode. 
-      //
-      if (persistBlocks.getAndSet(false)) {
+      
+      if (updateLength) {
+        synchronized (this) {
+          if (streamer != null && streamer.block != null) {
+            lastBlockLength = streamer.block.getNumBytes();
+          }
+        }
+      }
+      // If 1) any new blocks were allocated since the last flush, or 2) to
+      // update length in NN is requried, then persist block locations on
+      // namenode.
+      if (persistBlocks.getAndSet(false) || updateLength) {
         try {
-          dfsClient.namenode.fsync(src, dfsClient.clientName);
+          dfsClient.namenode.fsync(src, dfsClient.clientName, lastBlockLength);
         } catch (IOException ioe) {
           DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
           // If we got an error here, it might be because some other thread called

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java Fri Nov 30 19:52:49 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.client;
 
 import java.io.IOException;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -56,4 +57,24 @@ public class HdfsDataOutputStream extend
   public synchronized int getCurrentBlockReplication() throws IOException {
     return ((DFSOutputStream)getWrappedStream()).getCurrentBlockReplication();
   }
+  
+  /**
+   * Sync buffered data to DataNodes (flush to disk devices).
+   * 
+   * @param syncFlags
+   *          Indicate the detailed semantic and actions of the hsync.
+   * @throws IOException
+   * @see FSDataOutputStream#hsync()
+   */
+  public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
+    ((DFSOutputStream) getWrappedStream()).hsync(syncFlags);
+  }
+  
+  public static enum SyncFlag {
+    /**
+     * When doing sync to DataNodes, also update the metadata (block
+     * length) in the NameNode
+     */
+    UPDATE_LENGTH;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Nov 30 19:52:49 2012
@@ -827,14 +827,15 @@ public interface ClientProtocol {
    * The file must be currently open for writing.
    * @param src The string representation of the path
    * @param client The string representation of the client
-   * 
+   * @param lastBlockLength The length of the last block (under construction) 
+   *                        to be reported to NameNode 
    * @throws AccessControlException permission denied
    * @throws FileNotFoundException file <code>src</code> is not found
    * @throws UnresolvedLinkException if <code>src</code> contains a symlink. 
    * @throws IOException If an I/O error occurred
    */
   @Idempotent
-  public void fsync(String src, String client) 
+  public void fsync(String src, String client, long lastBlockLength) 
       throws AccessControlException, FileNotFoundException, 
       UnresolvedLinkException, IOException;
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Nov 30 19:52:49 2012
@@ -705,7 +705,7 @@ public class ClientNamenodeProtocolServe
   public FsyncResponseProto fsync(RpcController controller,
       FsyncRequestProto req) throws ServiceException {
     try {
-      server.fsync(req.getSrc(), req.getClient());
+      server.fsync(req.getSrc(), req.getClient(), req.getLastBlockLength());
       return VOID_FSYNC_RESPONSE;
     } catch (IOException e) {
       throw new ServiceException(e);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Fri Nov 30 19:52:49 2012
@@ -663,12 +663,11 @@ public class ClientNamenodeProtocolTrans
   }
 
   @Override
-  public void fsync(String src, String client) throws AccessControlException,
-      FileNotFoundException, UnresolvedLinkException, IOException {
-    FsyncRequestProto req = FsyncRequestProto.newBuilder()
-        .setSrc(src)
-        .setClient(client)
-        .build();
+  public void fsync(String src, String client, long lastBlockLength)
+      throws AccessControlException, FileNotFoundException,
+      UnresolvedLinkException, IOException {
+    FsyncRequestProto req = FsyncRequestProto.newBuilder().setSrc(src)
+        .setClient(client).setLastBlockLength(lastBlockLength).build();
     try {
       rpcProxy.fsync(null, req);
     } catch (ServiceException e) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Nov 30 19:52:49 2012
@@ -168,7 +168,7 @@ import org.apache.hadoop.util.ToolRunner
  * <ol>
  * <li>The cluster is balanced. Exiting
  * <li>No block can be moved. Exiting...
- * <li>No block has been moved for 3 iterations. Exiting...
+ * <li>No block has been moved for 5 iterations. Exiting...
  * <li>Received an IO exception: failure reason. Exiting...
  * <li>Another balancer is running. Exiting...
  * </ol>
@@ -222,7 +222,7 @@ public class Balancer {
   private Map<String, BalancerDatanode> datanodes
                  = new HashMap<String, BalancerDatanode>();
   
-  private NetworkTopology cluster = new NetworkTopology();
+  private NetworkTopology cluster;
   
   final static private int MOVER_THREAD_POOL_SIZE = 1000;
   final private ExecutorService moverExecutor = 
@@ -249,7 +249,7 @@ public class Balancer {
      * Return true if a block and its proxy are chosen; false otherwise
      */
     private boolean chooseBlockAndProxy() {
-      // iterate all source's blocks until find a good one    
+      // iterate all source's blocks until find a good one
       for (Iterator<BalancerBlock> blocks=
         source.getBlockIterator(); blocks.hasNext();) {
         if (markMovedIfGoodBlock(blocks.next())) {
@@ -293,22 +293,35 @@ public class Balancer {
      * @return true if a proxy is found; otherwise false
      */
     private boolean chooseProxySource() {
-      // check if there is replica which is on the same rack with the target
+      final DatanodeInfo targetDN = target.getDatanode();
+      boolean find = false;
       for (BalancerDatanode loc : block.getLocations()) {
-        if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
-          if (loc.addPendingBlock(this)) {
-            proxySource = loc;
+        // check if there is replica which is on the same rack with the target
+        if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
+          find = true;
+          // if cluster is not nodegroup aware or the proxy is on the same 
+          // nodegroup with target, then we already find the nearest proxy
+          if (!cluster.isNodeGroupAware() 
+              || cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)) {
             return true;
           }
         }
-      }
-      // find out a non-busy replica
-      for (BalancerDatanode loc : block.getLocations()) {
-        if (loc.addPendingBlock(this)) {
-          proxySource = loc;
-          return true;
+        
+        if (!find) {
+          // find out a non-busy replica out of rack of target
+          find = addTo(loc);
         }
       }
+      
+      return find;
+    }
+    
+    // add a BalancerDatanode as proxy source for specific block movement
+    private boolean addTo(BalancerDatanode bdn) {
+      if (bdn.addPendingBlock(this)) {
+        proxySource = bdn;
+        return true;
+      }
       return false;
     }
     
@@ -686,7 +699,7 @@ public class Balancer {
         NodeTask task = tasks.next();
         BalancerDatanode target = task.getDatanode();
         PendingBlockMove pendingBlock = new PendingBlockMove();
-        if ( target.addPendingBlock(pendingBlock) ) { 
+        if (target.addPendingBlock(pendingBlock)) { 
           // target is not busy, so do a tentative block allocation
           pendingBlock.source = this;
           pendingBlock.target = target;
@@ -787,9 +800,10 @@ public class Balancer {
    */
   private static void checkReplicationPolicyCompatibility(Configuration conf
       ) throws UnsupportedActionException {
-    if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != 
-        BlockPlacementPolicyDefault.class) {
-      throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+    if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof 
+        BlockPlacementPolicyDefault) {
+      throw new UnsupportedActionException(
+          "Balancer without BlockPlacementPolicyDefault");
     }
   }
 
@@ -804,6 +818,7 @@ public class Balancer {
     this.threshold = p.threshold;
     this.policy = p.policy;
     this.nnc = theblockpool;
+    cluster = NetworkTopology.getInstance(conf);
   }
   
   /* Shuffle datanode array */
@@ -914,9 +929,15 @@ public class Balancer {
    * Return total number of bytes to move in this iteration
    */
   private long chooseNodes() {
-    // Match nodes on the same rack first
+    // First, match nodes on the same node group if cluster has nodegroup
+    // awareness
+    if (cluster.isNodeGroupAware()) {
+      chooseNodesOnSameNodeGroup();
+    }
+    
+    // Then, match nodes on the same rack
     chooseNodes(true);
-    // Then match nodes on different racks
+    // At last, match nodes on different racks
     chooseNodes(false);
     
     assert (datanodes.size() >= sources.size()+targets.size())
@@ -931,6 +952,102 @@ public class Balancer {
     }
     return bytesToMove;
   }
+  
+  /**
+   * Decide all <source, target> pairs where source and target are 
+   * on the same NodeGroup
+   */
+  private void chooseNodesOnSameNodeGroup() {
+
+    /* first step: match each overUtilized datanode (source) to
+     * one or more underUtilized datanodes within same NodeGroup(targets).
+     */
+    chooseOnSameNodeGroup(overUtilizedDatanodes, underUtilizedDatanodes);
+
+    /* match each remaining overutilized datanode (source) to below average 
+     * utilized datanodes within the same NodeGroup(targets).
+     * Note only overutilized datanodes that haven't had that max bytes to move
+     * satisfied in step 1 are selected
+     */
+    chooseOnSameNodeGroup(overUtilizedDatanodes, belowAvgUtilizedDatanodes);
+
+    /* match each remaining underutilized datanode to above average utilized 
+     * datanodes within the same NodeGroup.
+     * Note only underutilized datanodes that have not had that max bytes to
+     * move satisfied in step 1 are selected.
+     */
+    chooseOnSameNodeGroup(underUtilizedDatanodes, aboveAvgUtilizedDatanodes);
+  }
+  
+  /**
+   * Match two sets of nodes within the same NodeGroup, one should be source
+   * nodes (utilization > Avg), and the other should be destination nodes 
+   * (utilization < Avg).
+   * @param datanodes
+   * @param candidates
+   */
+  private <D extends BalancerDatanode, C extends BalancerDatanode> void 
+      chooseOnSameNodeGroup(Collection<D> datanodes, Collection<C> candidates) {
+    for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
+      final D datanode = i.next();
+      for(; chooseOnSameNodeGroup(datanode, candidates.iterator()); );
+      if (!datanode.isMoveQuotaFull()) {
+        i.remove();
+      }
+    }
+  }
+  
+  /**
+   * Match one datanode with a set of candidates nodes within the same NodeGroup.
+   */
+  private <T extends BalancerDatanode> boolean chooseOnSameNodeGroup(
+      BalancerDatanode dn, Iterator<T> candidates) {
+    final T chosen = chooseCandidateOnSameNodeGroup(dn, candidates);
+    if (chosen == null) {
+      return false;
+    }
+    if (dn instanceof Source) {
+      matchSourceWithTargetToMove((Source)dn, chosen);
+    } else {
+      matchSourceWithTargetToMove((Source)chosen, dn);
+    }
+    if (!chosen.isMoveQuotaFull()) {
+      candidates.remove();
+    }
+    return true;
+  }
+  
+  private void matchSourceWithTargetToMove(
+      Source source, BalancerDatanode target) {
+    long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
+    NodeTask nodeTask = new NodeTask(target, size);
+    source.addNodeTask(nodeTask);
+    target.incScheduledSize(nodeTask.getSize());
+    sources.add(source);
+    targets.add(target);
+    LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+        +source.datanode.getName() + " to " + target.datanode.getName());
+  }
+  
+  /** choose a datanode from <code>candidates</code> within the same NodeGroup 
+   * of <code>dn</code>.
+   */
+  private <T extends BalancerDatanode> T chooseCandidateOnSameNodeGroup(
+      BalancerDatanode dn, Iterator<T> candidates) {
+    if (dn.isMoveQuotaFull()) {
+      for(; candidates.hasNext(); ) {
+        final T c = candidates.next();
+        if (!c.isMoveQuotaFull()) {
+          candidates.remove();
+          continue;
+        }
+        if (cluster.isOnSameNodeGroup(dn.getDatanode(), c.getDatanode())) {
+          return c;
+        }
+      }
+    }
+    return null;
+  }
 
   /* if onRack is true, decide all <source, target> pairs
    * where source and target are on the same rack; Otherwise
@@ -941,33 +1058,33 @@ public class Balancer {
     /* first step: match each overUtilized datanode (source) to
      * one or more underUtilized datanodes (targets).
      */
-    chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+    chooseTargets(underUtilizedDatanodes, onRack);
     
     /* match each remaining overutilized datanode (source) to 
      * below average utilized datanodes (targets).
      * Note only overutilized datanodes that haven't had that max bytes to move
      * satisfied in step 1 are selected
      */
-    chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+    chooseTargets(belowAvgUtilizedDatanodes, onRack);
 
-    /* match each remaining underutilized datanode to 
-     * above average utilized datanodes.
+    /* match each remaining underutilized datanode (target) to 
+     * above average utilized datanodes (source).
      * Note only underutilized datanodes that have not had that max bytes to
      * move satisfied in step 1 are selected.
      */
-    chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+    chooseSources(aboveAvgUtilizedDatanodes, onRack);
   }
    
   /* choose targets from the target candidate list for each over utilized
    * source datanode. OnRackTarget determines if the chosen target 
    * should be on the same rack as the source
    */
-  private void chooseTargets(  
-      Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
+  private void chooseTargets(
+      Collection<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
     for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
         srcIterator.hasNext();) {
       Source source = srcIterator.next();
-      while (chooseTarget(source, targetCandidates, onRackTarget)) {
+      while (chooseTarget(source, targetCandidates.iterator(), onRackTarget)) {
       }
       if (!source.isMoveQuotaFull()) {
         srcIterator.remove();
@@ -981,11 +1098,11 @@ public class Balancer {
    * should be on the same rack as the target
    */
   private void chooseSources(
-      Iterator<Source> sourceCandidates, boolean onRackSource) {
+      Collection<Source> sourceCandidates, boolean onRackSource) {
     for (Iterator<BalancerDatanode> targetIterator = 
       underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
       BalancerDatanode target = targetIterator.next();
-      while (chooseSource(target, sourceCandidates, onRackSource)) {
+      while (chooseSource(target, sourceCandidates.iterator(), onRackSource)) {
       }
       if (!target.isMoveQuotaFull()) {
         targetIterator.remove();
@@ -1025,23 +1142,15 @@ public class Balancer {
     }
     if (foundTarget) {
       assert(target != null):"Choose a null target";
-      long size = Math.min(source.availableSizeToMove(),
-          target.availableSizeToMove());
-      NodeTask nodeTask = new NodeTask(target, size);
-      source.addNodeTask(nodeTask);
-      target.incScheduledSize(nodeTask.getSize());
-      sources.add(source);
-      targets.add(target);
+      matchSourceWithTargetToMove(source, target);
       if (!target.isMoveQuotaFull()) {
         targetCandidates.remove();
       }
-      LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-          +source.datanode + " to " + target.datanode);
       return true;
     }
     return false;
   }
-  
+
   /* For the given target, choose sources from the source candidate list.
    * OnRackSource determines if the chosen source 
    * should be on the same rack as the target
@@ -1073,18 +1182,10 @@ public class Balancer {
     }
     if (foundSource) {
       assert(source != null):"Choose a null source";
-      long size = Math.min(source.availableSizeToMove(),
-          target.availableSizeToMove());
-      NodeTask nodeTask = new NodeTask(target, size);
-      source.addNodeTask(nodeTask);
-      target.incScheduledSize(nodeTask.getSize());
-      sources.add(source);
-      targets.add(target);
+      matchSourceWithTargetToMove(source, target);
       if ( !source.isMoveQuotaFull()) {
-        sourceCandidates.remove();
-      }
-      LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-          +source.datanode + " to " + target.datanode);
+          sourceCandidates.remove();
+        }
       return true;
     }
     return false;
@@ -1226,6 +1327,10 @@ public class Balancer {
     if (block.isLocatedOnDatanode(target)) {
       return false;
     }
+    if (cluster.isNodeGroupAware() && 
+        isOnSameNodeGroupWithReplicas(target, block, source)) {
+      return false;
+    }
 
     boolean goodBlock = false;
     if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
@@ -1257,10 +1362,32 @@ public class Balancer {
     }
     return goodBlock;
   }
-  
+
+  /**
+   * Check if there are any replica (other than source) on the same node group
+   * with target. If true, then target is not a good candidate for placing 
+   * specific block replica as we don't want 2 replicas under the same nodegroup 
+   * after balance.
+   * @param target targetDataNode
+   * @param block dataBlock
+   * @param source sourceDataNode
+   * @return true if there are any replica (other than source) on the same node
+   * group with target
+   */
+  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+      BalancerBlock block, Source source) {
+    for (BalancerDatanode loc : block.locations) {
+      if (loc != source && 
+        cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
+          return true;
+        }
+      }
+    return false;
+  }
+
   /* reset all fields in a balancer preparing for the next iteration */
-  private void resetData() {
-    this.cluster = new NetworkTopology();
+  private void resetData(Configuration conf) {
+    this.cluster = NetworkTopology.getInstance(conf);
     this.overUtilizedDatanodes.clear();
     this.aboveAvgUtilizedDatanodes.clear();
     this.belowAvgUtilizedDatanodes.clear();
@@ -1331,7 +1458,8 @@ public class Balancer {
   }
 
   /** Run an iteration for all datanodes. */
-  private ReturnStatus run(int iteration, Formatter formatter) {
+  private ReturnStatus run(int iteration, Formatter formatter,
+      Configuration conf) {
     try {
       /* get all live datanodes of a cluster and their disk usage
        * decide the number of bytes need to be moved
@@ -1385,7 +1513,7 @@ public class Balancer {
       }
 
       // clean all lists
-      resetData();
+      resetData(conf);
       return ReturnStatus.IN_PROGRESS;
     } catch (IllegalArgumentException e) {
       System.out.println(e + ".  Exiting ...");
@@ -1433,7 +1561,7 @@ public class Balancer {
         Collections.shuffle(connectors);
         for(NameNodeConnector nnc : connectors) {
           final Balancer b = new Balancer(nnc, p, conf);
-          final ReturnStatus r = b.run(iteration, formatter);
+          final ReturnStatus r = b.run(iteration, formatter, conf);
           if (r == ReturnStatus.IN_PROGRESS) {
             done = false;
           } else if (r != ReturnStatus.SUCCESS) {
@@ -1527,7 +1655,7 @@ public class Balancer {
       if (args != null) {
         try {
           for(int i = 0; i < args.length; i++) {
-            checkArgument(args.length >= 2, "args = " + Arrays.toString(args));           
+            checkArgument(args.length >= 2, "args = " + Arrays.toString(args));
             if ("-threshold".equalsIgnoreCase(args[i])) {
               i++;
               try {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Nov 30 19:52:49 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.HadoopIllegalAr
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -151,10 +150,7 @@ public class DatanodeManager {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
     
-    Class<? extends NetworkTopology> networkTopologyClass =
-        conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
-            NetworkTopology.class, NetworkTopology.class);
-    networktopology = ReflectionUtils.newInstance(networkTopologyClass, conf);
+    networktopology = NetworkTopology.getInstance(conf);
 
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Nov 30 19:52:49 2012
@@ -3023,9 +3023,11 @@ public class FSNamesystem implements Nam
   /** Persist all metadata about this file.
    * @param src The string representation of the path
    * @param clientName The string representation of the client
+   * @param lastBlockLength The length of the last block 
+   *                        under construction reported from client.
    * @throws IOException if path does not exist
    */
-  void fsync(String src, String clientName) 
+  void fsync(String src, String clientName, long lastBlockLength) 
       throws IOException, UnresolvedLinkException {
     NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
     writeLock();
@@ -3035,6 +3037,9 @@ public class FSNamesystem implements Nam
         throw new SafeModeException("Cannot fsync file " + src, safeMode);
       }
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
+      if (lastBlockLength > 0) {
+        pendingFile.updateLengthOfLastBlock(lastBlockLength);
+      }
       dir.persistBlocks(src, pendingFile);
     } finally {
       writeUnlock();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Fri Nov 30 19:52:49 2012
@@ -172,4 +172,22 @@ public class INodeFileUnderConstruction 
     setBlock(numBlocks()-1, ucBlock);
     return ucBlock;
   }
+
+  /**
+   * Update the length for the last block
+   * 
+   * @param lastBlockLength
+   *          The length of the last block reported from client
+   * @throws IOException
+   */
+  void updateLengthOfLastBlock(long lastBlockLength) throws IOException {
+    BlockInfo lastBlock = this.getLastBlock();
+    assert (lastBlock != null) : "The last block for path "
+        + this.getFullPathName() + " is null when updating its length";
+    assert (lastBlock instanceof BlockInfoUnderConstruction) : "The last block for path "
+        + this.getFullPathName()
+        + " is not a BlockInfoUnderConstruction when updating its length";
+    lastBlock.setNumBytes(lastBlockLength);
+  }
+  
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Fri Nov 30 19:52:49 2012
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.util.Time.now;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
@@ -39,8 +42,6 @@ import org.apache.hadoop.util.Daemon;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-import static org.apache.hadoop.util.Time.now;
-
 /**
  * LeaseManager does the lease housekeeping for writing on files.   
  * This class also provides useful static methods for lease recovery.
@@ -340,7 +341,8 @@ public class LeaseManager {
     }
 
     final int len = overwrite.length();
-    for(Map.Entry<String, Lease> entry : findLeaseWithPrefixPath(src, sortedLeasesByPath)) {
+    for(Map.Entry<String, Lease> entry
+        : findLeaseWithPrefixPath(src, sortedLeasesByPath).entrySet()) {
       final String oldpath = entry.getKey();
       final Lease lease = entry.getValue();
       //overwrite must be a prefix of oldpath
@@ -355,7 +357,8 @@ public class LeaseManager {
   }
 
   synchronized void removeLeaseWithPrefixPath(String prefix) {
-    for(Map.Entry<String, Lease> entry : findLeaseWithPrefixPath(prefix, sortedLeasesByPath)) {
+    for(Map.Entry<String, Lease> entry
+        : findLeaseWithPrefixPath(prefix, sortedLeasesByPath).entrySet()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(LeaseManager.class.getSimpleName()
             + ".removeLeaseWithPrefixPath: entry=" + entry);
@@ -364,13 +367,13 @@ public class LeaseManager {
     }
   }
 
-  static private List<Map.Entry<String, Lease>> findLeaseWithPrefixPath(
+  static private Map<String, Lease> findLeaseWithPrefixPath(
       String prefix, SortedMap<String, Lease> path2lease) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(LeaseManager.class.getSimpleName() + ".findLease: prefix=" + prefix);
     }
 
-    List<Map.Entry<String, Lease>> entries = new ArrayList<Map.Entry<String, Lease>>();
+    final Map<String, Lease> entries = new HashMap<String, Lease>();
     final int srclen = prefix.length();
 
     for(Map.Entry<String, Lease> entry : path2lease.tailMap(prefix).entrySet()) {
@@ -379,7 +382,7 @@ public class LeaseManager {
         return entries;
       }
       if (p.length() == srclen || p.charAt(srclen) == Path.SEPARATOR_CHAR) {
-        entries.add(entry);
+        entries.put(entry.getKey(), entry.getValue());
       }
     }
     return entries;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Nov 30 19:52:49 2012
@@ -825,8 +825,9 @@ class NameNodeRpcServer implements Namen
   }
   
   @Override // ClientProtocol
-  public void fsync(String src, String clientName) throws IOException {
-    namesystem.fsync(src, clientName);
+  public void fsync(String src, String clientName, long lastBlockLength)
+      throws IOException {
+    namesystem.fsync(src, clientName, lastBlockLength);
   }
 
   @Override // ClientProtocol

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsBinaryLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsBinaryLoader.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsBinaryLoader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsBinaryLoader.java Fri Nov 30 19:52:49 2012
@@ -79,6 +79,7 @@ class OfflineEditsBinaryLoader implement
       } catch (IOException e) {
         if (!recoveryMode) {
           // Tell the visitor to clean up, then re-throw the exception
+          LOG.error("Got IOException at position " + inputStream.getPosition());
           visitor.close(e);
           throw e;
         }
@@ -87,6 +88,7 @@ class OfflineEditsBinaryLoader implement
       } catch (RuntimeException e) {
         if (!recoveryMode) {
           // Tell the visitor to clean up, then re-throw the exception
+          LOG.error("Got RuntimeException at position " + inputStream.getPosition());
           visitor.close(e);
           throw e;
         }

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1414455-1415803

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Fri Nov 30 19:52:49 2012
@@ -357,6 +357,7 @@ message SetQuotaResponseProto { // void 
 message FsyncRequestProto {
   required string src = 1;
   required string client = 2;
+  optional sint64 lastBlockLength = 3 [default = -1];
 }
 
 message FsyncResponseProto { // void response

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1414455-1415803

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1414455-1415803

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1414455-1415803

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1414455-1415803

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Nov 30 19:52:49 2012
@@ -321,7 +321,7 @@ public class MiniDFSCluster {
   /**
    * Used by builder to create and return an instance of MiniDFSCluster
    */
-  private MiniDFSCluster(Builder builder) throws IOException {
+  protected MiniDFSCluster(Builder builder) throws IOException {
     if (builder.nnTopology == null) {
       // If no topology is specified, build a single NN. 
       builder.nnTopology = MiniDFSNNTopology.simpleSingleNN(
@@ -369,8 +369,8 @@ public class MiniDFSCluster {
 
   private Configuration conf;
   private NameNodeInfo[] nameNodes;
-  private int numDataNodes;
-  private ArrayList<DataNodeProperties> dataNodes = 
+  protected int numDataNodes;
+  protected List<DataNodeProperties> dataNodes = 
                          new ArrayList<DataNodeProperties>();
   private File base_dir;
   private File data_dir;
@@ -2303,7 +2303,7 @@ public class MiniDFSCluster {
     return port;
   }
   
-  private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
+  protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
                            boolean checkDataNodeAddrConfig) throws IOException {
     if (setupHostsFile) {
       String hostsFile = conf.get(DFS_HOSTS, "").trim();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java?rev=1415809&r1=1415808&r2=1415809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java Fri Nov 30 19:52:49 2012
@@ -23,12 +23,14 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.EnumSet;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.log4j.Level;
 import org.junit.Test;
@@ -43,16 +45,21 @@ public class TestHFlush {
   
   private final String fName = "hflushtest.dat";
   
-  /** The test uses {@link #doTheJob(Configuration, String, long, short)
+  /**
+   * The test uses
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 
    * to write a file with a standard block size
    */
   @Test
   public void hFlush_01() throws IOException {
-    doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE, (short)2);
+    doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
+        (short) 2, false, EnumSet.noneOf(SyncFlag.class));
   }
 
-  /** The test uses {@link #doTheJob(Configuration, String, long, short)
-   * to write a file with a custom block size so the writes will be
+  /**
+   * The test uses
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 
+   * to write a file with a custom block size so the writes will be 
    * happening across block' boundaries
    */
   @Test
@@ -64,14 +71,17 @@ public class TestHFlush {
     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
 
-    doTheJob(conf, fName, customBlockSize, (short)2);
+    doTheJob(conf, fName, customBlockSize, (short) 2, false,
+        EnumSet.noneOf(SyncFlag.class));
   }
 
-  /** The test uses {@link #doTheJob(Configuration, String, long, short)
-   * to write a file with a custom block size so the writes will be
+  /**
+   * The test uses
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 
+   * to write a file with a custom block size so the writes will be 
    * happening across block's and checksum' boundaries
    */
- @Test
+  @Test
   public void hFlush_03() throws IOException {
     Configuration conf = new HdfsConfiguration();
     int customPerChecksumSize = 400;
@@ -80,22 +90,106 @@ public class TestHFlush {
     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
 
-    doTheJob(conf, fName, customBlockSize, (short)2);
+    doTheJob(conf, fName, customBlockSize, (short) 2, false,
+        EnumSet.noneOf(SyncFlag.class));
+  }
+
+  /**
+   * Test hsync (with updating block length in NameNode) while no data is
+   * actually written yet
+   */
+  @Test
+  public void hSyncUpdateLength_00() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        2).build();
+    DistributedFileSystem fileSystem =
+        (DistributedFileSystem)cluster.getFileSystem();
+    
+    try {
+      Path path = new Path(fName);
+      FSDataOutputStream stm = fileSystem.create(path, true, 4096, (short) 2,
+          AppendTestUtil.BLOCK_SIZE);
+      System.out.println("Created file " + path.toString());
+      ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+          .of(SyncFlag.UPDATE_LENGTH));
+      long currentFileLength = fileSystem.getFileStatus(path).getLen();
+      assertEquals(0L, currentFileLength);
+      stm.close();
+    } finally {
+      fileSystem.close();
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * The test calls
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+   * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
+   */
+  @Test
+  public void hSyncUpdateLength_01() throws IOException {
+    doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
+        (short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH));
   }
 
   /**
-    The method starts new cluster with defined Configuration;
-    creates a file with specified block_size and writes 10 equal sections in it;
-    it also calls hflush() after each write and throws an IOException in case of 
-    an error.
-    @param conf cluster configuration
-    @param fileName of the file to be created and processed as required
-    @param block_size value to be used for the file's creation
-    @param replicas is the number of replicas
-    @throws IOException in case of any errors 
+   * The test calls
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+   * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
+   * Similar with {@link #hFlush_02()} , it writes a file with a custom block
+   * size so the writes will be happening across block' boundaries
+   */
+  @Test
+  public void hSyncUpdateLength_02() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    // Modify defaul filesystem settings
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+
+    doTheJob(conf, fName, customBlockSize, (short) 2, true,
+        EnumSet.of(SyncFlag.UPDATE_LENGTH));
+  }
+  
+  /**
+   * The test calls
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+   * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
+   * Similar with {@link #hFlush_03()} , it writes a file with a custom block
+   * size so the writes will be happening across block's and checksum'
+   * boundaries.
+   */
+  @Test
+  public void hSyncUpdateLength_03() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    int customPerChecksumSize = 400;
+    int customBlockSize = customPerChecksumSize * 3;
+    // Modify defaul filesystem settings
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+
+    doTheJob(conf, fName, customBlockSize, (short) 2, true,
+        EnumSet.of(SyncFlag.UPDATE_LENGTH));
+  }
+  
+  /**
+   * The method starts new cluster with defined Configuration; creates a file
+   * with specified block_size and writes 10 equal sections in it; it also calls
+   * hflush/hsync after each write and throws an IOException in case of an error.
+   * 
+   * @param conf cluster configuration
+   * @param fileName of the file to be created and processed as required
+   * @param block_size value to be used for the file's creation
+   * @param replicas is the number of replicas
+   * @param isSync hsync or hflush         
+   * @param syncFlags specify the semantic of the sync/flush
+   * @throws IOException in case of any errors
    */
   public static void doTheJob(Configuration conf, final String fileName,
-                              long block_size, short replicas) throws IOException {
+      long block_size, short replicas, boolean isSync,
+      EnumSet<SyncFlag> syncFlags) throws IOException {
     byte[] fileContent;
     final int SECTIONS = 10;
 
@@ -119,8 +213,21 @@ public class TestHFlush {
         System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName);
         // write to the file
         stm.write(fileContent, tenth * i, tenth);
-        // Wait while hflush() pushes all packets through built pipeline
-        ((DFSOutputStream)stm.getWrappedStream()).hflush();
+        
+        // Wait while hflush/hsync pushes all packets through built pipeline
+        if (isSync) {
+          ((DFSOutputStream)stm.getWrappedStream()).hsync(syncFlags);
+        } else {
+          ((DFSOutputStream)stm.getWrappedStream()).hflush();
+        }
+        
+        // Check file length if updatelength is required
+        if (isSync && syncFlags.contains(SyncFlag.UPDATE_LENGTH)) {
+          long currentFileLength = fileSystem.getFileStatus(path).getLen();
+          assertEquals(
+            "File size doesn't match for hsync/hflush with updating the length",
+            tenth * (i + 1), currentFileLength);
+        }
         byte [] toRead = new byte[tenth];
         byte [] expected = new byte[tenth];
         System.arraycopy(fileContent, tenth * i, expected, 0, tenth);
@@ -139,8 +246,6 @@ public class TestHFlush {
 
       assertEquals("File size doesn't match ", AppendTestUtil.FILE_SIZE, fileSystem.getFileStatus(path).getLen());
       AppendTestUtil.checkFullFile(fileSystem, path, fileContent.length, fileContent, "hflush()");
-    } catch (Exception e) {
-      e.printStackTrace();
     } finally {
       fileSystem.close();
       cluster.shutdown();



Mime
View raw message