hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cl...@apache.org
Subject svn commit: r1602947 [2/4] - in /hadoop/common/branches/fs-encryption/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs-httpfs/src/test...
Date Mon, 16 Jun 2014 18:14:08 GMT
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Mon Jun 16 18:13:57 2014
@@ -180,8 +180,11 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto;
@@ -1338,6 +1341,16 @@ public class ClientNamenodeProtocolServe
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public ListXAttrsResponseProto listXAttrs(RpcController controller,
+    ListXAttrsRequestProto req) throws ServiceException {
+    try {
+      return PBHelper.convertListXAttrsResponse(server.listXAttrs(req.getSrc()));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
   
   @Override
   public RemoveXAttrResponseProto removeXAttr(RpcController controller,

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Mon Jun 16 18:13:57 2014
@@ -148,6 +148,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.DeleteEncryptionZoneRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
@@ -1345,6 +1346,19 @@ public class ClientNamenodeProtocolTrans
   }
   
   @Override
+  public List<XAttr> listXAttrs(String src)
+      throws IOException {
+    ListXAttrsRequestProto.Builder builder = ListXAttrsRequestProto.newBuilder();
+    builder.setSrc(src);
+    ListXAttrsRequestProto req = builder.build();
+    try {
+      return PBHelper.convert(rpcProxy.listXAttrs(null, req));
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
   public void removeXAttr(String src, XAttr xAttr) throws IOException {
     RemoveXAttrRequestProto req = RemoveXAttrRequestProto
         .newBuilder().setSrc(src)

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Mon Jun 16 18:13:57 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -156,6 +157,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.XAttrProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.XAttrProto.XAttrNamespaceProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.XAttrSetFlagProto;
@@ -2237,6 +2239,21 @@ public class PBHelper {
     return builder.build();
   }
 
+  public static List<XAttr> convert(ListXAttrsResponseProto a) {
+    final List<XAttrProto> xAttrs = a.getXAttrsList();
+    return convertXAttrs(xAttrs);
+  }
+
+  public static ListXAttrsResponseProto convertListXAttrsResponse(
+    List<XAttr> names) {
+    ListXAttrsResponseProto.Builder builder =
+      ListXAttrsResponseProto.newBuilder();
+    if (names != null) {
+      builder.addAllXAttrs(convertXAttrProto(names));
+    }
+    return builder.build();
+  }
+
   public static ShortCircuitShmSlotProto convert(SlotId slotId) {
     return ShortCircuitShmSlotProto.newBuilder().
         setShmId(convert(slotId.getShmId())).

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Jun 16 18:13:57 2014
@@ -696,7 +696,7 @@ public class Balancer {
             // update locations
             for (String datanodeUuid : blk.getDatanodeUuids()) {
               final BalancerDatanode d = datanodeMap.get(datanodeUuid);
-              if (datanode != null) { // not an unknown datanode
+              if (d != null) { // not an unknown datanode
                 block.addLocation(d);
               }
             }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Jun 16 18:13:57 2014
@@ -1054,6 +1054,9 @@ public class BlockManager {
    * datanode and log the operation
    */
   void addToInvalidates(final Block block, final DatanodeInfo datanode) {
+    if (!namesystem.isPopulatingReplQueues()) {
+      return;
+    }
     invalidateBlocks.add(block, datanode, true);
   }
 
@@ -1062,6 +1065,9 @@ public class BlockManager {
    * datanodes.
    */
   private void addToInvalidates(Block b) {
+    if (!namesystem.isPopulatingReplQueues()) {
+      return;
+    }
     StringBuilder datanodes = new StringBuilder();
     for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@@ -1094,8 +1100,9 @@ public class BlockManager {
           + blk + " not found");
       return;
     }
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
-        Reason.CORRUPTION_REPORTED), dn, storageID);
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+        blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
+        dn, storageID);
   }
 
   private void markBlockAsCorrupt(BlockToMarkCorrupt b,
@@ -1121,7 +1128,25 @@ public class BlockManager {
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
         b.reasonCode);
-    if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
+
+    NumberReplicas numberOfReplicas = countNodes(b.stored);
+    boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= bc
+        .getBlockReplication();
+    boolean minReplicationSatisfied =
+        numberOfReplicas.liveReplicas() >= minReplication;
+    boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
+        (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
+        bc.getBlockReplication();
+    boolean corruptedDuringWrite = minReplicationSatisfied &&
+        (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
+    // case 1: have enough number of live replicas
+    // case 2: corrupted replicas + live replicas > Replication factor
+    // case 3: Block is marked corrupt due to failure while writing. In this
+    //         case genstamp will be different than that of valid block.
+    // In all these cases we can delete the replica.
+    // In case of 3, rbw block will be deleted and valid block can be replicated
+    if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
+        || corruptedDuringWrite) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(b, node);
     } else if (namesystem.isPopulatingReplQueues()) {

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Mon Jun 16 18:13:57 2014
@@ -331,6 +331,18 @@ public class DatanodeManager {
     return heartbeatManager;
   }
 
+  private boolean isInactive(DatanodeInfo datanode) {
+    if (datanode.isDecommissioned()) {
+      return true;
+    }
+
+    if (avoidStaleDataNodesForRead) {
+      return datanode.isStale(staleInterval);
+    }
+      
+    return false;
+  }
+  
   /** Sort the located blocks by the distance to the target host. */
   public void sortLocatedBlocks(final String targethost,
       final List<LocatedBlock> locatedblocks) {
@@ -351,9 +363,17 @@ public class DatanodeManager {
         DFSUtil.DECOM_COMPARATOR;
         
     for (LocatedBlock b : locatedblocks) {
-      networktopology.pseudoSortByDistance(client, b.getLocations());
+      DatanodeInfo[] di = b.getLocations();
       // Move decommissioned/stale datanodes to the bottom
-      Arrays.sort(b.getLocations(), comparator);
+      Arrays.sort(di, comparator);
+      
+      int lastActiveIndex = di.length - 1;
+      while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) {
+          --lastActiveIndex;
+      }
+      int activeLen = lastActiveIndex + 1;      
+      networktopology.sortByDistance(client, b.getLocations(), activeLen,
+          b.getBlock().getBlockId());
     }
   }
   

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Jun 16 18:13:57 2014
@@ -104,6 +104,7 @@ class BlockReceiver implements Closeable
   private boolean dropCacheBehindWrites;
   private long lastCacheManagementOffset = 0;
   private boolean syncBehindWrites;
+  private boolean syncBehindWritesInBackground;
 
   /** The client name.  It is empty if a datanode is the client */
   private final String clientname;
@@ -207,6 +208,8 @@ class BlockReceiver implements Closeable
         datanode.getDnConf().dropCacheBehindWrites :
           cachingStrategy.getDropBehind();
       this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
+      this.syncBehindWritesInBackground = datanode.getDnConf().
+          syncBehindWritesInBackground;
       
       final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
@@ -668,10 +671,17 @@ class BlockReceiver implements Closeable
         // of file                 
         //
         if (syncBehindWrites) {
-          NativeIO.POSIX.syncFileRangeIfPossible(outFd,
-              lastCacheManagementOffset,
-              offsetInBlock - lastCacheManagementOffset,
-              NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
+          if (syncBehindWritesInBackground) {
+            this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
+                block, outFd, lastCacheManagementOffset,
+                offsetInBlock - lastCacheManagementOffset,
+                NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
+          } else {
+            NativeIO.POSIX.syncFileRangeIfPossible(outFd,
+                lastCacheManagementOffset, offsetInBlock
+                    - lastCacheManagementOffset,
+                NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
+          }
         }
         //
         // For POSIX_FADV_DONTNEED, we want to drop from the beginning 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Mon Jun 16 18:13:57 2014
@@ -67,6 +67,7 @@ public class DNConf {
   final boolean transferToAllowed;
   final boolean dropCacheBehindWrites;
   final boolean syncBehindWrites;
+  final boolean syncBehindWritesInBackground;
   final boolean dropCacheBehindReads;
   final boolean syncOnClose;
   final boolean encryptDataTransfer;
@@ -119,6 +120,9 @@ public class DNConf {
     syncBehindWrites = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
         DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
+    syncBehindWritesInBackground = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_KEY,
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_DEFAULT);
     dropCacheBehindReads = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
         DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Jun 16 18:13:57 2014
@@ -231,6 +231,9 @@ public class DataNode extends Configured
   private boolean checkDiskErrorFlag = false;
   private Object checkDiskErrorMutex = new Object();
   private long lastDiskErrorCheck;
+  private String supergroup;
+  private boolean isPermissionEnabled;
+  private String dnUserName = null;
 
   /**
    * Create the DataNode given a configuration, an array of dataDirs,
@@ -252,6 +255,11 @@ public class DataNode extends Configured
     this.getHdfsBlockLocationsEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+    this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
+        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
+    this.isPermissionEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
+        DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
 
     confVersion = "core-" +
         conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
@@ -432,6 +440,33 @@ public class DataNode extends Configured
       ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
     }
   }
+
+  /** Check whether the current user is in the superuser group. */
+  private void checkSuperuserPrivilege() throws IOException, AccessControlException {
+    if (!isPermissionEnabled) {
+      return;
+    }
+    // Try to get the ugi in the RPC call.
+    UserGroupInformation callerUgi = ipcServer.getRemoteUser();
+    if (callerUgi == null) {
+      // This is not from RPC.
+      callerUgi = UserGroupInformation.getCurrentUser();
+    }
+
+    // Is this by the DN user itself?
+    assert dnUserName != null;
+    if (callerUgi.getShortUserName().equals(dnUserName)) {
+      return;
+    }
+
+    // Is the user a member of the super group?
+    List<String> groups = Arrays.asList(callerUgi.getGroupNames());
+    if (groups.contains(supergroup)) {
+      return;
+    }
+    // Not a superuser.
+    throw new AccessControlException();
+  }
   
 /**
  * Initialize the datanode's periodic scanners:
@@ -735,6 +770,11 @@ public class DataNode extends Configured
   
     // BlockPoolTokenSecretManager is required to create ipc server.
     this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
+
+    // Login is done by now. Set the DN user name.
+    dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
+    LOG.info("dnUserName = " + dnUserName);
+    LOG.info("supergroup = " + supergroup);
     initIpcServer(conf);
 
     metrics = DataNodeMetrics.create(conf, getDisplayName());
@@ -2414,6 +2454,7 @@ public class DataNode extends Configured
 
   @Override // ClientDatanodeProtocol
   public void refreshNamenodes() throws IOException {
+    checkSuperuserPrivilege();
     conf = new Configuration();
     refreshNamenodes(conf);
   }
@@ -2421,6 +2462,7 @@ public class DataNode extends Configured
   @Override // ClientDatanodeProtocol
   public void deleteBlockPool(String blockPoolId, boolean force)
       throws IOException {
+    checkSuperuserPrivilege();
     LOG.info("deleteBlockPool command received for block pool " + blockPoolId
         + ", force=" + force);
     if (blockPoolManager.get(blockPoolId) != null) {
@@ -2436,6 +2478,7 @@ public class DataNode extends Configured
 
   @Override // ClientDatanodeProtocol
   public synchronized void shutdownDatanode(boolean forUpgrade) throws IOException {
+    checkSuperuserPrivilege();
     LOG.info("shutdownDatanode command received (upgrade=" + forUpgrade +
         "). Shutting down Datanode...");
 
@@ -2602,4 +2645,4 @@ public class DataNode extends Configured
       return lastDiskErrorCheck;
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java Mon Jun 16 18:13:57 2014
@@ -45,11 +45,19 @@ public class AvailableSpaceVolumeChoosin
   
   private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class);
   
-  private static final Random RAND = new Random();
+  private final Random random;
   
   private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
   private float balancedPreferencePercent = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
 
+  AvailableSpaceVolumeChoosingPolicy(Random random) {
+    this.random = random;
+  }
+
+  public AvailableSpaceVolumeChoosingPolicy() {
+    this(new Random());
+  }
+
   @Override
   public synchronized void setConf(Configuration conf) {
     balancedSpaceThreshold = conf.getLong(
@@ -128,7 +136,7 @@ public class AvailableSpaceVolumeChoosin
           (highAvailableVolumes.size() * balancedPreferencePercent) /
           preferencePercentScaler;
       if (mostAvailableAmongLowVolumes < replicaSize ||
-          RAND.nextFloat() < scaledPreferencePercent) {
+          random.nextFloat() < scaledPreferencePercent) {
         volume = roundRobinPolicyHighAvailable.chooseVolume(
             highAvailableVolumes,
             replicaSize);

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Mon Jun 16 18:13:57 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
 
 
 import java.io.File;
+import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -431,5 +432,12 @@ public interface FsDatasetSpi<V extends 
    * @return true when trash is enabled
    */
   public boolean trashEnabled(String bpid);
+
+  /**
+   * submit a sync_file_range request to AsyncDiskService
+   */
+  public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
+      final FileDescriptor fd, final long offset, final long nbytes,
+      final int flags);
 }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Mon Jun 16 18:13:57 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
+import java.io.FileDescriptor;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -31,6 +32,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
 
 /**
  * This class is a container of multiple thread pools, each for a volume,
@@ -42,6 +45,7 @@ import org.apache.hadoop.hdfs.server.pro
  * can be slow, and we don't want to use a single thread pool because that
  * is inefficient when we have more than 1 volume.  AsyncDiskService is the
  * solution for these.
+ * Another example of async disk operation is requesting sync_file_range().
  * 
  * This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar.
  * They should be combined.
@@ -148,6 +152,21 @@ class FsDatasetAsyncDiskService {
     }
   }
 
+  public void submitSyncFileRangeRequest(FsVolumeImpl volume,
+      final FileDescriptor fd, final long offset, final long nbytes,
+      final int flags) {
+    execute(volume.getCurrentDir(), new Runnable() {
+      @Override
+      public void run() {
+        try {
+          NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags);
+        } catch (NativeIOException e) {
+          LOG.warn("sync_file_range error", e);
+        }
+      }
+    });
+  }
+
   /**
    * Delete the block file and meta file from the disk asynchronously, adjust
    * dfsUsed statistics accordingly.

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Jun 16 18:13:57 2014
@@ -1907,5 +1907,13 @@ class FsDatasetImpl implements FsDataset
     }
     return new RollingLogsImpl(dir, prefix);
   }
+
+  @Override
+  public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
+      FileDescriptor fd, long offset, long nbytes, int flags) {
+    FsVolumeImpl fsVolumeImpl = this.getVolume(block);
+    asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
+        nbytes, flags);
+  }
 }
 

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Jun 16 18:13:57 2014
@@ -50,7 +50,6 @@ import org.apache.hadoop.fs.permission.P
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -265,11 +264,6 @@ public class FSDirectory implements Clos
     ready = flag;
   }
 
-  private void incrDeletedFileCount(long count) {
-    if (getFSNamesystem() != null)
-      NameNode.getNameNodeMetrics().incrFilesDeleted(count);
-  }
-    
   /**
    * Shutdown the filestore
    */
@@ -321,19 +315,7 @@ public class FSDirectory implements Clos
       UnresolvedLinkException, SnapshotAccessControlException, AclException {
     waitForReady();
 
-    // Always do an implicit mkdirs for parent directory tree.
     long modTime = now();
-    
-    Path parent = new Path(path).getParent();
-    if (parent == null) {
-      // Trying to add "/" as a file - this path has no
-      // parent -- avoids an NPE below.
-      return null;
-    }
-    
-    if (!mkdirs(parent.toString(), permissions, true, modTime)) {
-      return null;
-    }
     INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
         permissions, modTime, modTime, BlockInfo.EMPTY_ARRAY, replication,
         preferredBlockSize);
@@ -437,65 +419,6 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Persist the block list for the inode.
-   */
-  void persistBlocks(String path, INodeFile file, boolean logRetryCache) {
-    Preconditions.checkArgument(file.isUnderConstruction());
-    waitForReady();
-
-    writeLock();
-    try {
-      fsImage.getEditLog().logUpdateBlocks(path, file, logRetryCache);
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
-            +path+" with "+ file.getBlocks().length 
-            +" blocks is persisted to the file system");
-      }
-    } finally {
-      writeUnlock();
-    }
-  }
-  
-  /**
-   * Persist the new block (the last block of the given file).
-   */
-  void persistNewBlock(String path, INodeFile file) {
-    Preconditions.checkArgument(file.isUnderConstruction());
-    waitForReady();
-
-    writeLock();
-    try {
-      fsImage.getEditLog().logAddBlock(path, file);
-    } finally {
-      writeUnlock();
-    }
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.persistNewBlock: "
-          + path + " with new block " + file.getLastBlock().toString()
-          + ", current total block count is " + file.getBlocks().length);
-    }
-  }
-  
-  /**
-   * Close file.
-   */
-  void closeFile(String path, INodeFile file) {
-    waitForReady();
-    writeLock();
-    try {
-      // file is closed
-      fsImage.getEditLog().logCloseFile(path, file);
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
-            +path+" with "+ file.getBlocks().length 
-            +" blocks is persisted to the file system");
-      }
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
    * Remove a block from the file.
    * @return Whether the block exists in the corresponding file
    */
@@ -540,7 +463,7 @@ public class FSDirectory implements Clos
    * @deprecated Use {@link #renameTo(String, String, boolean, Rename...)}
    */
   @Deprecated
-  boolean renameTo(String src, String dst, boolean logRetryCache) 
+  boolean renameTo(String src, String dst, long mtime)
       throws QuotaExceededException, UnresolvedLinkException, 
       FileAlreadyExistsException, SnapshotAccessControlException, IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -548,22 +471,20 @@ public class FSDirectory implements Clos
           +src+" to "+dst);
     }
     waitForReady();
-    long now = now();
     writeLock();
     try {
-      if (!unprotectedRenameTo(src, dst, now))
+      if (!unprotectedRenameTo(src, dst, mtime))
         return false;
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logRename(src, dst, now, logRetryCache);
     return true;
   }
 
   /**
    * @see #unprotectedRenameTo(String, String, long, Options.Rename...)
    */
-  void renameTo(String src, String dst, boolean logRetryCache, 
+  void renameTo(String src, String dst, long mtime,
       Options.Rename... options)
       throws FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, QuotaExceededException,
@@ -573,16 +494,14 @@ public class FSDirectory implements Clos
           + " to " + dst);
     }
     waitForReady();
-    long now = now();
     writeLock();
     try {
-      if (unprotectedRenameTo(src, dst, now, options)) {
-        incrDeletedFileCount(1);
+      if (unprotectedRenameTo(src, dst, mtime, options)) {
+        namesystem.incrDeletedFileCount(1);
       }
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logRename(src, dst, now, logRetryCache, options);
   }
 
   /**
@@ -1106,11 +1025,7 @@ public class FSDirectory implements Clos
     waitForReady();
     writeLock();
     try {
-      final Block[] fileBlocks = unprotectedSetReplication(
-          src, replication, blockRepls);
-      if (fileBlocks != null)  // log replication change
-        fsImage.getEditLog().logSetReplication(src, replication);
-      return fileBlocks;
+      return unprotectedSetReplication(src, replication, blockRepls);
     } finally {
       writeUnlock();
     }
@@ -1178,7 +1093,6 @@ public class FSDirectory implements Clos
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logSetPermissions(src, permission);
   }
   
   void unprotectedSetPermission(String src, FsPermission permissions)
@@ -1203,7 +1117,6 @@ public class FSDirectory implements Clos
     } finally {
       writeUnlock();
     }
-    fsImage.getEditLog().logSetOwner(src, username, groupname);
   }
 
   void unprotectedSetOwner(String src, String username, String groupname)
@@ -1226,18 +1139,14 @@ public class FSDirectory implements Clos
   /**
    * Concat all the blocks from srcs to trg and delete the srcs files
    */
-  void concat(String target, String [] srcs, boolean supportRetryCache) 
+  void concat(String target, String[] srcs, long timestamp)
       throws UnresolvedLinkException, QuotaExceededException,
       SnapshotAccessControlException, SnapshotException {
     writeLock();
     try {
       // actual move
       waitForReady();
-      long timestamp = now();
       unprotectedConcat(target, srcs, timestamp);
-      // do the commit
-      fsImage.getEditLog().logConcat(target, srcs, timestamp, 
-          supportRetryCache);
     } finally {
       writeUnlock();
     }
@@ -1312,17 +1221,14 @@ public class FSDirectory implements Clos
    * @param src Path of a directory to delete
    * @param collectedBlocks Blocks under the deleted directory
    * @param removedINodes INodes that should be removed from {@link #inodeMap}
-   * @param logRetryCache Whether to record RPC IDs in editlog to support retry
-   *                      cache rebuilding.
-   * @return true on successful deletion; else false
+   * @return the number of files that have been removed
    */
-  boolean delete(String src, BlocksMapUpdateInfo collectedBlocks,
-      List<INode> removedINodes, boolean logRetryCache) throws IOException {
+  long delete(String src, BlocksMapUpdateInfo collectedBlocks,
+              List<INode> removedINodes, long mtime) throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
     }
     waitForReady();
-    long now = now();
     final long filesRemoved;
     writeLock();
     try {
@@ -1335,20 +1241,13 @@ public class FSDirectory implements Clos
             new ArrayList<INodeDirectorySnapshottable>();
         checkSnapshot(inodesInPath.getLastINode(), snapshottableDirs);
         filesRemoved = unprotectedDelete(inodesInPath, collectedBlocks,
-            removedINodes, now);
+            removedINodes, mtime);
         namesystem.removeSnapshottableDirs(snapshottableDirs);
       }
     } finally {
       writeUnlock();
     }
-    if (filesRemoved < 0) {
-      return false;
-    }
-    fsImage.getEditLog().logDelete(src, now, logRetryCache);
-    incrDeletedFileCount(filesRemoved);
-    // Blocks/INodes will be handled later by the caller of this method
-    getFSNamesystem().removePathAndBlocks(src, null, null);
-    return true;
+    return filesRemoved;
   }
   
   private static boolean deleteAllowed(final INodesInPath iip,
@@ -1895,112 +1794,6 @@ public class FSDirectory implements Clos
     // inodes can be null only when its called without holding lock
     return inodes == null ? "" : getFullPathName(inodes, inodes.length - 1);
   }
-  
-  /**
-   * Create a directory 
-   * If ancestor directories do not exist, automatically create them.
-
-   * @param src string representation of the path to the directory
-   * @param permissions the permission of the directory
-   * @param inheritPermission if the permission of the directory should inherit
-   *                          from its parent or not. u+wx is implicitly added to
-   *                          the automatically created directories, and to the
-   *                          given directory if inheritPermission is true
-   * @param now creation time
-   * @return true if the operation succeeds false otherwise
-   * @throws QuotaExceededException if directory creation violates
-   *                                any quota limit
-   * @throws UnresolvedLinkException if a symlink is encountered in src.                      
-   * @throws SnapshotAccessControlException if path is in RO snapshot
-   */
-  boolean mkdirs(String src, PermissionStatus permissions,
-      boolean inheritPermission, long now)
-      throws FileAlreadyExistsException, QuotaExceededException, 
-             UnresolvedLinkException, SnapshotAccessControlException,
-             AclException {
-    src = normalizePath(src);
-    String[] names = INode.getPathNames(src);
-    byte[][] components = INode.getPathComponents(names);
-    final int lastInodeIndex = components.length - 1;
-
-    writeLock();
-    try {
-      INodesInPath iip = getExistingPathINodes(components);
-      if (iip.isSnapshot()) {
-        throw new SnapshotAccessControlException(
-            "Modification on RO snapshot is disallowed");
-      }
-      INode[] inodes = iip.getINodes();
-
-      // find the index of the first null in inodes[]
-      StringBuilder pathbuilder = new StringBuilder();
-      int i = 1;
-      for(; i < inodes.length && inodes[i] != null; i++) {
-        pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        if (!inodes[i].isDirectory()) {
-          throw new FileAlreadyExistsException("Parent path is not a directory: "
-              + pathbuilder+ " "+inodes[i].getLocalName());
-        }
-      }
-
-      // default to creating parent dirs with the given perms
-      PermissionStatus parentPermissions = permissions;
-
-      // if not inheriting and it's the last inode, there's no use in
-      // computing perms that won't be used
-      if (inheritPermission || (i < lastInodeIndex)) {
-        // if inheriting (ie. creating a file or symlink), use the parent dir,
-        // else the supplied permissions
-        // NOTE: the permissions of the auto-created directories violate posix
-        FsPermission parentFsPerm = inheritPermission
-            ? inodes[i-1].getFsPermission() : permissions.getPermission();
-        
-        // ensure that the permissions allow user write+execute
-        if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
-          parentFsPerm = new FsPermission(
-              parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
-              parentFsPerm.getGroupAction(),
-              parentFsPerm.getOtherAction()
-          );
-        }
-        
-        if (!parentPermissions.getPermission().equals(parentFsPerm)) {
-          parentPermissions = new PermissionStatus(
-              parentPermissions.getUserName(),
-              parentPermissions.getGroupName(),
-              parentFsPerm
-          );
-          // when inheriting, use same perms for entire path
-          if (inheritPermission) permissions = parentPermissions;
-        }
-      }
-      
-      // create directories beginning from the first null index
-      for(; i < inodes.length; i++) {
-        pathbuilder.append(Path.SEPARATOR).append(names[i]);
-        unprotectedMkdir(namesystem.allocateNewInodeId(), iip, i,
-            components[i], (i < lastInodeIndex) ? parentPermissions
-                : permissions, null, now);
-        if (inodes[i] == null) {
-          return false;
-        }
-        // Directory creation also count towards FilesCreated
-        // to match count of FilesDeleted metric.
-        if (getFSNamesystem() != null)
-          NameNode.getNameNodeMetrics().incrFilesCreated();
-
-        final String cur = pathbuilder.toString();
-        fsImage.getEditLog().logMkDir(cur, inodes[i]);
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug(
-              "DIR* FSDirectory.mkdirs: created directory " + cur);
-        }
-      }
-    } finally {
-      writeUnlock();
-    }
-    return true;
-  }
 
   INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions,
                           List<AclEntry> aclEntries, long timestamp)
@@ -2019,7 +1812,7 @@ public class FSDirectory implements Clos
    * The parent path to the directory is at [0, pos-1].
    * All ancestors exist. Newly created one stored at index pos.
    */
-  private void unprotectedMkdir(long inodeId, INodesInPath inodesInPath,
+  void unprotectedMkdir(long inodeId, INodesInPath inodesInPath,
       int pos, byte[] name, PermissionStatus permission,
       List<AclEntry> aclEntries, long timestamp)
       throws QuotaExceededException, AclException {
@@ -2331,10 +2124,8 @@ public class FSDirectory implements Clos
     }
     return 1;
   }
-  
-  /**
-   */
-  String normalizePath(String src) {
+
+  static String normalizePath(String src) {
     if (src.length() > 1 && src.endsWith("/")) {
       src = src.substring(0, src.length() - 1);
     }
@@ -2419,7 +2210,7 @@ public class FSDirectory implements Clos
   /**
    * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
    * Sets quota for for a directory.
-   * @return INodeDirectory if any of the quotas have changed. null other wise.
+   * @return INodeDirectory if any of the quotas have changed. null otherwise.
    * @throws FileNotFoundException if the path does not exist.
    * @throws PathIsNotDirectoryException if the path is not a directory.
    * @throws QuotaExceededException if the directory tree size is 
@@ -2470,21 +2261,17 @@ public class FSDirectory implements Clos
   
   /**
    * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
+   * @return INodeDirectory if any of the quotas have changed. null otherwise.
    * @throws SnapshotAccessControlException if path is in RO snapshot
    * @see #unprotectedSetQuota(String, long, long)
    */
-  void setQuota(String src, long nsQuota, long dsQuota) 
+  INodeDirectory setQuota(String src, long nsQuota, long dsQuota)
       throws FileNotFoundException, PathIsNotDirectoryException,
       QuotaExceededException, UnresolvedLinkException,
       SnapshotAccessControlException {
     writeLock();
     try {
-      INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
-      if (dir != null) {
-        final Quota.Counts q = dir.getQuotaCounts();
-        fsImage.getEditLog().logSetQuota(src,
-            q.get(Quota.NAMESPACE), q.get(Quota.DISKSPACE));
-      }
+      return unprotectedSetQuota(src, nsQuota, dsQuota);
     } finally {
       writeUnlock();
     }
@@ -2503,18 +2290,14 @@ public class FSDirectory implements Clos
   /**
    * Sets the access time on the file/directory. Logs it in the transaction log.
    */
-  void setTimes(String src, INode inode, long mtime, long atime, boolean force,
-      int latestSnapshotId) throws QuotaExceededException {
-    boolean status = false;
+  boolean setTimes(INode inode, long mtime, long atime, boolean force,
+                   int latestSnapshotId) throws QuotaExceededException {
     writeLock();
     try {
-      status = unprotectedSetTimes(inode, mtime, atime, force, latestSnapshotId);
+      return unprotectedSetTimes(inode, mtime, atime, force, latestSnapshotId);
     } finally {
       writeUnlock();
     }
-    if (status) {
-      fsImage.getEditLog().logTimes(src, mtime, atime);
-    }
   }
 
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
@@ -2680,49 +2463,21 @@ public class FSDirectory implements Clos
     }
     return perm;
   }
-    
+
   /**
-   * Add the given symbolic link to the fs. Record it in the edits log.
+   * Add the specified path into the namespace.
    */
-  INodeSymlink addSymlink(String path, String target,
-      PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
-      throws UnresolvedLinkException, FileAlreadyExistsException,
-      QuotaExceededException, SnapshotAccessControlException, AclException {
-    waitForReady();
-
-    final long modTime = now();
-    if (createParent) {
-      final String parent = new Path(path).getParent().toString();
-      if (!mkdirs(parent, dirPerms, true, modTime)) {
-        return null;
-      }
-    }
-    final String userName = dirPerms.getUserName();
-    INodeSymlink newNode  = null;
-    long id = namesystem.allocateNewInodeId();
+  INodeSymlink addSymlink(long id, String path, String target,
+                          long mtime, long atime, PermissionStatus perm)
+          throws UnresolvedLinkException, QuotaExceededException {
     writeLock();
     try {
-      newNode = unprotectedAddSymlink(id, path, target, modTime, modTime,
-          new PermissionStatus(userName, null, FsPermission.getDefault()));
+      return unprotectedAddSymlink(id, path, target, mtime, atime, perm);
     } finally {
       writeUnlock();
     }
-    if (newNode == null) {
-      NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path);
-      return null;
-    }
-    fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode,
-        logRetryCache);
-    
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added");
-    }
-    return newNode;
   }
 
-  /**
-   * Add the specified path into the namespace. Invoked from edit log processing.
-   */
   INodeSymlink unprotectedAddSymlink(long id, String path, String target,
       long mtime, long atime, PermissionStatus perm)
       throws UnresolvedLinkException, QuotaExceededException {
@@ -2732,11 +2487,10 @@ public class FSDirectory implements Clos
     return addINode(path, symlink) ? symlink : null;
   }
 
-  void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
+  List<AclEntry> modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
     writeLock();
     try {
-      List<AclEntry> newAcl = unprotectedModifyAclEntries(src, aclSpec);
-      fsImage.getEditLog().logSetAcl(src, newAcl);
+      return unprotectedModifyAclEntries(src, aclSpec);
     } finally {
       writeUnlock();
     }
@@ -2755,11 +2509,10 @@ public class FSDirectory implements Clos
     return newAcl;
   }
 
-  void removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
+  List<AclEntry> removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
     writeLock();
     try {
-      List<AclEntry> newAcl = unprotectedRemoveAclEntries(src, aclSpec);
-      fsImage.getEditLog().logSetAcl(src, newAcl);
+      return unprotectedRemoveAclEntries(src, aclSpec);
     } finally {
       writeUnlock();
     }
@@ -2778,11 +2531,10 @@ public class FSDirectory implements Clos
     return newAcl;
   }
 
-  void removeDefaultAcl(String src) throws IOException {
+  List<AclEntry> removeDefaultAcl(String src) throws IOException {
     writeLock();
     try {
-      List<AclEntry> newAcl = unprotectedRemoveDefaultAcl(src);
-      fsImage.getEditLog().logSetAcl(src, newAcl);
+      return unprotectedRemoveDefaultAcl(src);
     } finally {
       writeUnlock();
     }
@@ -2805,7 +2557,6 @@ public class FSDirectory implements Clos
     writeLock();
     try {
       unprotectedRemoveAcl(src);
-      fsImage.getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
     } finally {
       writeUnlock();
     }
@@ -2819,11 +2570,10 @@ public class FSDirectory implements Clos
     AclStorage.removeINodeAcl(inode, snapshotId);
   }
 
-  void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
+  List<AclEntry> setAcl(String src, List<AclEntry> aclSpec) throws IOException {
     writeLock();
     try {
-      List<AclEntry> newAcl = unprotectedSetAcl(src, aclSpec);
-      fsImage.getEditLog().logSetAcl(src, newAcl);
+      return unprotectedSetAcl(src, aclSpec);
     } finally {
       writeUnlock();
     }
@@ -2870,18 +2620,11 @@ public class FSDirectory implements Clos
       readUnlock();
     }
   }
-  
-  void removeXAttr(String src, XAttr xAttr) throws IOException {
+
+  XAttr removeXAttr(String src, XAttr xAttr) throws IOException {
     writeLock();
     try {
-      XAttr removedXAttr = unprotectedRemoveXAttr(src, xAttr);
-      if (removedXAttr != null) {
-        fsImage.getEditLog().logRemoveXAttr(src, removedXAttr);
-      } else {
-        NameNode.stateChangeLog.info("DIR* FSDirectory.removeXAttr: XAttr " +
-        		XAttrHelper.getPrefixName(xAttr) + 
-        		" does not exist on the path " + src);
-      }
+      return unprotectedRemoveXAttr(src, xAttr);
     } finally {
       writeUnlock();
     }
@@ -2919,12 +2662,11 @@ public class FSDirectory implements Clos
     return xAttrs;
   }
   
-  void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
-      boolean logRetryCache) throws IOException {
+  void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
+          throws IOException {
     writeLock();
     try {
       unprotectedSetXAttr(src, xAttr, flag);
-      fsImage.getEditLog().logSetXAttr(src, xAttr, logRetryCache);
     } finally {
       writeUnlock();
     }
@@ -2945,6 +2687,7 @@ public class FSDirectory implements Clos
       EnumSet<XAttrSetFlag> flag) throws QuotaExceededException, IOException {
     List<XAttr> xAttrs = Lists.newArrayListWithCapacity(
         existingXAttrs != null ? existingXAttrs.size() + 1 : 1);
+    int userVisibleXAttrsNum = 0; // Number of user visible xAttrs
     boolean exist = false;
     if (existingXAttrs != null) {
       for (XAttr a: existingXAttrs) {
@@ -2953,6 +2696,10 @@ public class FSDirectory implements Clos
           exist = true;
         } else {
           xAttrs.add(a);
+          
+          if (isUserVisible(a)) {
+            userVisibleXAttrsNum++;
+          }
         }
       }
     }
@@ -2960,7 +2707,11 @@ public class FSDirectory implements Clos
     XAttrSetFlag.validate(xAttr.getName(), exist, flag);
     xAttrs.add(xAttr);
     
-    if (xAttrs.size() > inodeXAttrsLimit) {
+    if (isUserVisible(xAttr)) {
+      userVisibleXAttrsNum++;
+    }
+    
+    if (userVisibleXAttrsNum > inodeXAttrsLimit) {
       throw new IOException("Cannot add additional XAttr to inode, "
           + "would exceed limit of " + inodeXAttrsLimit);
     }
@@ -2968,6 +2719,14 @@ public class FSDirectory implements Clos
     return xAttrs;
   }
   
+  private boolean isUserVisible(XAttr xAttr) {
+    if (xAttr.getNameSpace() == XAttr.NameSpace.USER || 
+        xAttr.getNameSpace() == XAttr.NameSpace.TRUSTED) {
+      return true;
+    }
+    return false;
+  }
+  
   List<XAttr> getXAttrs(String src) throws IOException {
     String srcs = normalizePath(src);
     readLock();

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Jun 16 18:13:57 2014
@@ -145,6 +145,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -167,6 +168,7 @@ import org.apache.hadoop.hdfs.protocol.Q
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -1567,6 +1569,7 @@ public class FSNamesystem implements Nam
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
       dir.setPermission(src, permission);
+      getEditLog().logSetPermissions(src, permission);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -1612,6 +1615,7 @@ public class FSNamesystem implements Nam
         }
       }
       dir.setOwner(src, username, group);
+      getEditLog().logSetOwner(src, username, group);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -1633,9 +1637,11 @@ public class FSNamesystem implements Nam
       blockManager.getDatanodeManager().sortLocatedBlocks(
           clientMachine, blocks.getLocatedBlocks());
       
+      // lastBlock is not part of getLocatedBlocks(), might need to sort it too
       LocatedBlock lastBlock = blocks.getLastLocatedBlock();
       if (lastBlock != null) {
-        ArrayList<LocatedBlock> lastBlockList = new ArrayList<LocatedBlock>();
+        ArrayList<LocatedBlock> lastBlockList =
+            Lists.newArrayListWithCapacity(1);
         lastBlockList.add(lastBlock);
         blockManager.getDatanodeManager().sortLocatedBlocks(
                               clientMachine, lastBlockList);
@@ -1740,7 +1746,11 @@ public class FSNamesystem implements Nam
             if (isReadOp) {
               continue;
             }
-            dir.setTimes(src, inode, -1, now, false, iip.getLatestSnapshotId());
+            boolean changed = dir.setTimes(inode, -1, now, false,
+                    iip.getLatestSnapshotId());
+            if (changed) {
+              getEditLog().logTimes(src, -1, now);
+            }
           }
         }
         final long fileSize = iip.isSnapshot() ?
@@ -1951,7 +1961,9 @@ public class FSNamesystem implements Nam
           Arrays.toString(srcs) + " to " + target);
     }
 
-    dir.concat(target,srcs, logRetryCache);
+    long timestamp = now();
+    dir.concat(target, srcs, timestamp);
+    getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
   }
   
   /**
@@ -1992,7 +2004,11 @@ public class FSNamesystem implements Nam
       final INodesInPath iip = dir.getINodesInPath4Write(src);
       final INode inode = iip.getLastINode();
       if (inode != null) {
-        dir.setTimes(src, inode, mtime, atime, true, iip.getLatestSnapshotId());
+        boolean changed = dir.setTimes(inode, mtime, atime, true,
+                iip.getLatestSnapshotId());
+        if (changed) {
+          getEditLog().logTimes(src, mtime, atime);
+        }
         resultingStat = getAuditFileInfo(src, false);
       } else {
         throw new FileNotFoundException("File/Directory " + src + " does not exist.");
@@ -2061,7 +2077,7 @@ public class FSNamesystem implements Nam
       checkFsObjectLimit();
 
       // add symbolic link to namespace
-      dir.addSymlink(link, target, dirPerms, createParent, logRetryCache);
+      addSymlink(link, target, dirPerms, createParent, logRetryCache);
       resultingStat = getAuditFileInfo(link, false);
     } finally {
       writeUnlock();
@@ -2113,6 +2129,7 @@ public class FSNamesystem implements Nam
       final Block[] blocks = dir.setReplication(src, replication, blockRepls);
       isFile = blocks != null;
       if (isFile) {
+        getEditLog().logSetReplication(src, replication);
         blockManager.setReplication(blockRepls[0], blockRepls[1], src, blocks);
       }
     } finally {
@@ -2313,8 +2330,16 @@ public class FSNamesystem implements Nam
       final DatanodeDescriptor clientNode = 
           blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
 
-      INodeFile newNode = dir.addFile(src, permissions, replication, blockSize,
-          holder, clientMachine, clientNode);
+      INodeFile newNode = null;
+
+      // Always do an implicit mkdirs for parent directory tree.
+      Path parent = new Path(src).getParent();
+      if (parent != null && mkdirsRecursively(parent.toString(),
+              permissions, true, now())) {
+        newNode = dir.addFile(src, permissions, replication, blockSize,
+                holder, clientMachine, clientNode);
+      }
+
       if (newNode == null) {
         throw new IOException("Unable to add " + src +  " to namespace");
       }
@@ -2738,7 +2763,7 @@ public class FSNamesystem implements Nam
       INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
       saveAllocatedBlock(src, inodesInPath, newBlock, targets);
 
-      dir.persistNewBlock(src, pendingFile);
+      persistNewBlock(src, pendingFile);
       offset = pendingFile.computeFileSize();
     } finally {
       writeUnlock();
@@ -2958,7 +2983,7 @@ public class FSNamesystem implements Nam
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                       + b + " is removed from pendingCreates");
       }
-      dir.persistBlocks(src, file, false);
+      persistBlocks(src, file, false);
     } finally {
       writeUnlock();
     }
@@ -3258,7 +3283,9 @@ public class FSNamesystem implements Nam
           false, false);
     }
 
-    if (dir.renameTo(src, dst, logRetryCache)) {
+    long mtime = now();
+    if (dir.renameTo(src, dst, mtime)) {
+      getEditLog().logRename(src, dst, mtime, logRetryCache);
       return true;
     }
     return false;
@@ -3323,7 +3350,9 @@ public class FSNamesystem implements Nam
           false);
     }
 
-    dir.renameTo(src, dst, logRetryCache, options);
+    long mtime = now();
+    dir.renameTo(src, dst, mtime, options);
+    getEditLog().logRename(src, dst, mtime, logRetryCache, options);
   }
   
   /**
@@ -3406,10 +3435,17 @@ public class FSNamesystem implements Nam
         checkPermission(pc, src, false, null, FsAction.WRITE, null,
             FsAction.ALL, true, false);
       }
+      long mtime = now();
       // Unlink the target directory from directory tree
-      if (!dir.delete(src, collectedBlocks, removedINodes, logRetryCache)) {
+      long filesRemoved = dir.delete(src, collectedBlocks, removedINodes,
+              mtime);
+      if (filesRemoved < 0) {
         return false;
       }
+      getEditLog().logDelete(src, mtime, logRetryCache);
+      incrDeletedFileCount(filesRemoved);
+      // Blocks/INodes will be handled later
+      removePathAndBlocks(src, null, null);
       ret = true;
     } finally {
       writeUnlock();
@@ -3669,13 +3705,119 @@ public class FSNamesystem implements Nam
     // create multiple inodes.
     checkFsObjectLimit();
 
-    if (!dir.mkdirs(src, permissions, false, now())) {
+    if (!mkdirsRecursively(src, permissions, false, now())) {
       throw new IOException("Failed to create directory: " + src);
     }
     return true;
   }
 
   /**
+   * Create a directory
+   * If ancestor directories do not exist, automatically create them.
+
+   * @param src string representation of the path to the directory
+   * @param permissions the permission of the directory
+   * @param inheritPermission if the permission of the directory should inherit
+   *                          from its parent or not. u+wx is implicitly added to
+   *                          the automatically created directories, and to the
+   *                          given directory if inheritPermission is true
+   * @param now creation time
+   * @return true if the operation succeeds false otherwise
+   * @throws QuotaExceededException if directory creation violates
+   *                                any quota limit
+   * @throws UnresolvedLinkException if a symlink is encountered in src.
+   * @throws SnapshotAccessControlException if path is in RO snapshot
+   */
+  private boolean mkdirsRecursively(String src, PermissionStatus permissions,
+                 boolean inheritPermission, long now)
+          throws FileAlreadyExistsException, QuotaExceededException,
+                 UnresolvedLinkException, SnapshotAccessControlException,
+                 AclException {
+    src = FSDirectory.normalizePath(src);
+    String[] names = INode.getPathNames(src);
+    byte[][] components = INode.getPathComponents(names);
+    final int lastInodeIndex = components.length - 1;
+
+    dir.writeLock();
+    try {
+      INodesInPath iip = dir.getExistingPathINodes(components);
+      if (iip.isSnapshot()) {
+        throw new SnapshotAccessControlException(
+                "Modification on RO snapshot is disallowed");
+      }
+      INode[] inodes = iip.getINodes();
+
+      // find the index of the first null in inodes[]
+      StringBuilder pathbuilder = new StringBuilder();
+      int i = 1;
+      for(; i < inodes.length && inodes[i] != null; i++) {
+        pathbuilder.append(Path.SEPARATOR).append(names[i]);
+        if (!inodes[i].isDirectory()) {
+          throw new FileAlreadyExistsException(
+                  "Parent path is not a directory: "
+                  + pathbuilder + " "+inodes[i].getLocalName());
+        }
+      }
+
+      // default to creating parent dirs with the given perms
+      PermissionStatus parentPermissions = permissions;
+
+      // if not inheriting and it's the last inode, there's no use in
+      // computing perms that won't be used
+      if (inheritPermission || (i < lastInodeIndex)) {
+        // if inheriting (ie. creating a file or symlink), use the parent dir,
+        // else the supplied permissions
+        // NOTE: the permissions of the auto-created directories violate posix
+        FsPermission parentFsPerm = inheritPermission
+                ? inodes[i-1].getFsPermission() : permissions.getPermission();
+
+        // ensure that the permissions allow user write+execute
+        if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
+          parentFsPerm = new FsPermission(
+                  parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
+                  parentFsPerm.getGroupAction(),
+                  parentFsPerm.getOtherAction()
+          );
+        }
+
+        if (!parentPermissions.getPermission().equals(parentFsPerm)) {
+          parentPermissions = new PermissionStatus(
+                  parentPermissions.getUserName(),
+                  parentPermissions.getGroupName(),
+                  parentFsPerm
+          );
+          // when inheriting, use same perms for entire path
+          if (inheritPermission) permissions = parentPermissions;
+        }
+      }
+
+      // create directories beginning from the first null index
+      for(; i < inodes.length; i++) {
+        pathbuilder.append(Path.SEPARATOR).append(names[i]);
+        dir.unprotectedMkdir(allocateNewInodeId(), iip, i, components[i],
+                (i < lastInodeIndex) ? parentPermissions : permissions, null,
+                now);
+        if (inodes[i] == null) {
+          return false;
+        }
+        // Directory creation also count towards FilesCreated
+        // to match count of FilesDeleted metric.
+        NameNode.getNameNodeMetrics().incrFilesCreated();
+
+        final String cur = pathbuilder.toString();
+        getEditLog().logMkDir(cur, inodes[i]);
+        if(NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug(
+                  "mkdirs: created directory " + cur);
+        }
+      }
+    } finally {
+      dir.writeUnlock();
+    }
+    return true;
+  }
+
+  /**
    * Get the content summary for a specific file/dir.
    *
    * @param src The string representation of the path to the file
@@ -3719,7 +3861,7 @@ public class FSNamesystem implements Nam
    * 
    * Note: This does not support ".inodes" relative path.
    */
-  void setQuota(String path, long nsQuota, long dsQuota) 
+  void setQuota(String path, long nsQuota, long dsQuota)
       throws IOException, UnresolvedLinkException {
     checkSuperuserPrivilege();
     checkOperation(OperationCategory.WRITE);
@@ -3727,7 +3869,12 @@ public class FSNamesystem implements Nam
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set quota on " + path);
-      dir.setQuota(path, nsQuota, dsQuota);
+      INodeDirectory changed = dir.setQuota(path, nsQuota, dsQuota);
+      if (changed != null) {
+        final Quota.Counts q = changed.getQuotaCounts();
+        getEditLog().logSetQuota(path,
+                q.get(Quota.NAMESPACE), q.get(Quota.DISKSPACE));
+      }
     } finally {
       writeUnlock();
     }
@@ -3768,7 +3915,7 @@ public class FSNamesystem implements Nam
         pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
             pendingFile, lastBlockLength);
       }
-      dir.persistBlocks(src, pendingFile, false);
+      persistBlocks(src, pendingFile, false);
     } finally {
       writeUnlock();
     }
@@ -3961,7 +4108,7 @@ public class FSNamesystem implements Nam
     final INodeFile newFile = pendingFile.toCompleteFile(now());
 
     // close file and persist block allocations for this file
-    dir.closeFile(src, newFile);
+    closeFile(src, newFile);
 
     blockManager.checkReplication(newFile);
   }
@@ -4112,7 +4259,8 @@ public class FSNamesystem implements Nam
         src = closeFileCommitBlocks(iFile, storedBlock);
       } else {
         // If this commit does not want to close the file, persist blocks
-        src = persistBlocks(iFile, false);
+        src = iFile.getFullPathName();
+        persistBlocks(src, iFile, false);
       }
     } finally {
       writeUnlock();
@@ -4151,21 +4299,6 @@ public class FSNamesystem implements Nam
   }
 
   /**
-   * Persist the block list for the given file.
-   *
-   * @param pendingFile
-   * @return Path to the given file.
-   * @throws IOException
-   */
-  @VisibleForTesting
-  String persistBlocks(INodeFile pendingFile, boolean logRetryCache)
-      throws IOException {
-    String src = pendingFile.getFullPathName();
-    dir.persistBlocks(src, pendingFile, logRetryCache);
-    return src;
-  }
-
-  /**
    * Renew the lease(s) held by the given client
    */
   void renewLease(String holder) throws IOException {
@@ -4349,6 +4482,85 @@ public class FSNamesystem implements Nam
   }
 
   /**
+   * Persist the block list for the inode.
+   * @param path
+   * @param file
+   * @param logRetryCache
+   */
+  private void persistBlocks(String path, INodeFile file,
+                             boolean logRetryCache) {
+    assert hasWriteLock();
+    Preconditions.checkArgument(file.isUnderConstruction());
+    getEditLog().logUpdateBlocks(path, file, logRetryCache);
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("persistBlocks: " + path
+              + " with " + file.getBlocks().length + " blocks is persisted to" +
+              " the file system");
+    }
+  }
+
+  void incrDeletedFileCount(long count) {
+    NameNode.getNameNodeMetrics().incrFilesDeleted(count);
+  }
+
+  /**
+   * Close file.
+   * @param path
+   * @param file
+   */
+  private void closeFile(String path, INodeFile file) {
+    assert hasWriteLock();
+    dir.waitForReady();
+    // file is closed
+    getEditLog().logCloseFile(path, file);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("closeFile: "
+              +path+" with "+ file.getBlocks().length
+              +" blocks is persisted to the file system");
+    }
+  }
+
+  /**
+   * Add the given symbolic link to the fs. Record it in the edits log.
+   * @param path
+   * @param target
+   * @param dirPerms
+   * @param createParent
+   * @param logRetryCache
+   * @param dir
+   */
+  private INodeSymlink addSymlink(String path, String target,
+                                  PermissionStatus dirPerms,
+                                  boolean createParent, boolean logRetryCache)
+      throws UnresolvedLinkException, FileAlreadyExistsException,
+      QuotaExceededException, SnapshotAccessControlException, AclException {
+    dir.waitForReady();
+
+    final long modTime = now();
+    if (createParent) {
+      final String parent = new Path(path).getParent().toString();
+      if (!mkdirsRecursively(parent, dirPerms, true, modTime)) {
+        return null;
+      }
+    }
+    final String userName = dirPerms.getUserName();
+    long id = allocateNewInodeId();
+    INodeSymlink newNode = dir.addSymlink(id, path, target, modTime, modTime,
+            new PermissionStatus(userName, null, FsPermission.getDefault()));
+    if (newNode == null) {
+      NameNode.stateChangeLog.info("addSymlink: failed to add " + path);
+      return null;
+    }
+    getEditLog().logSymlink(path, target, modTime, modTime, newNode,
+        logRetryCache);
+
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("addSymlink: " + path + " is added");
+    }
+    return newNode;
+  }
+
+  /**
    * Periodically calls hasAvailableResources of NameNodeResourceChecker, and if
    * there are found to be insufficient resources available, causes the NN to
    * enter safe mode. If resources are later found to have returned to
@@ -4681,6 +4893,21 @@ public class FSNamesystem implements Nam
   }
 
   /**
+   * Persist the new block (the last block of the given file).
+   * @param path
+   * @param file
+   */
+  private void persistNewBlock(String path, INodeFile file) {
+    Preconditions.checkArgument(file.isUnderConstruction());
+    getEditLog().logAddBlock(path, file);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("persistNewBlock: "
+              + path + " with new block " + file.getLastBlock().toString()
+              + ", current total block count is " + file.getBlocks().length);
+    }
+  }
+
+  /**
    * SafeModeInfo contains information related to the safe mode.
    * <p>
    * An instance of {@link SafeModeInfo} is created when the name node
@@ -6088,7 +6315,7 @@ public class FSNamesystem implements Nam
     blockinfo.setExpectedLocations(storages);
 
     String src = pendingFile.getFullPathName();
-    dir.persistBlocks(src, pendingFile, logRetryCache);
+    persistBlocks(src, pendingFile, logRetryCache);
   }
 
   // rename was successful. If any part of the renamed subtree had
@@ -7716,7 +7943,8 @@ public class FSNamesystem implements Nam
       checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
-      dir.modifyAclEntries(src, aclSpec);
+      List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
+      getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -7737,7 +7965,8 @@ public class FSNamesystem implements Nam
       checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
-      dir.removeAclEntries(src, aclSpec);
+      List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
+      getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -7758,7 +7987,8 @@ public class FSNamesystem implements Nam
       checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
-      dir.removeDefaultAcl(src);
+      List<AclEntry> newAcl = dir.removeDefaultAcl(src);
+      getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -7780,6 +8010,7 @@ public class FSNamesystem implements Nam
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
       dir.removeAcl(src);
+      getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -7800,7 +8031,8 @@ public class FSNamesystem implements Nam
       checkNameNodeSafeMode("Cannot set ACL on " + src);
       src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOwner(pc, src);
-      dir.setAcl(src, aclSpec);
+      List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
+      getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -7887,7 +8119,8 @@ public class FSNamesystem implements Nam
         checkOwner(pc, src);
         checkPathAccess(pc, src, FsAction.WRITE);
       }
-      dir.setXAttr(src, xAttr, flag, logRetryCache);
+      dir.setXAttr(src, xAttr, flag);
+      getEditLog().logSetXAttr(src, xAttr, logRetryCache);
       resultingStat = getAuditFileInfo(src, false);
     } finally {
       writeUnlock();
@@ -7962,6 +8195,29 @@ public class FSNamesystem implements Nam
       readUnlock();
     }
   }
+
+  List<XAttr> listXAttrs(String src) throws IOException {
+    nnConf.checkXAttrsConfigFlag();
+    final FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      if (isPermissionEnabled) {
+        /* To access xattr names, you need EXECUTE in the owning directory. */
+        checkParentAccess(pc, src, FsAction.EXECUTE);
+      }
+      final List<XAttr> all = dir.getXAttrs(src);
+      final List<XAttr> filteredAll = XAttrPermissionFilter.
+        filterXAttrsForApi(pc, all);
+      return filteredAll;
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "listXAttrs", src);
+      throw e;
+    } finally {
+      readUnlock();
+    }
+  }
   
   void removeXAttr(String src, XAttr xAttr) throws IOException {
     nnConf.checkXAttrsConfigFlag();
@@ -7985,7 +8241,10 @@ public class FSNamesystem implements Nam
         checkPathAccess(pc, src, FsAction.WRITE);
       }
       
-      dir.removeXAttr(src, xAttr);
+      XAttr removedXAttr = dir.removeXAttr(src, xAttr);
+      if (removedXAttr != null) {
+        getEditLog().logRemoveXAttr(src, removedXAttr);
+      }
       resultingStat = getAuditFileInfo(src, false);
     } catch (AccessControlException e) {
       logAuditEvent(false, "removeXAttr", src);

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Mon Jun 16 18:13:57 2014
@@ -43,6 +43,8 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -132,10 +134,14 @@ public class FileJournalManager implemen
     Preconditions.checkState(!dstFile.exists(),
         "Can't finalize edits file " + inprogressFile + " since finalized file " +
         "already exists");
-    if (!inprogressFile.renameTo(dstFile)) {
+
+    try {
+      NativeIO.renameTo(inprogressFile, dstFile);
+    } catch (IOException e) {
       errorReporter.reportErrorOnFile(dstFile);
-      throw new IllegalStateException("Unable to finalize edits file " + inprogressFile);
+      throw new IllegalStateException("Unable to finalize edits file " + inprogressFile, e);
     }
+
     if (inprogressFile.equals(currentInProgress)) {
       currentInProgress = null;
     }
@@ -513,11 +519,16 @@ public class FileJournalManager implemen
       File src = file;
       File dst = new File(src.getParent(), src.getName() + newSuffix);
       // renameTo fails on Windows if the destination file already exists.
-      if (!src.renameTo(dst)) {
-        if (!dst.delete() || !src.renameTo(dst)) {
-          throw new IOException(
-            "Couldn't rename log " + src + " to " + dst);
+      try {
+        if (dst.exists()) {
+          if (!dst.delete()) {
+            throw new IOException("Couldn't delete " + dst);
+          }
         }
+        NativeIO.renameTo(src, dst);
+      } catch (IOException e) {
+        throw new IOException(
+            "Couldn't rename log " + src + " to " + dst, e);
       }
       file = dst;
     }

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Mon Jun 16 18:13:57 2014
@@ -133,6 +133,8 @@ import org.apache.hadoop.ipc.ProtobufRpc
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.WritableRpcEngine;
+import org.apache.hadoop.ipc.RefreshRegistry;
+import org.apache.hadoop.ipc.RefreshResponse;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Groups;
@@ -148,6 +150,9 @@ import org.apache.hadoop.security.protoc
 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
@@ -230,6 +235,11 @@ class NameNodeRpcServer implements Namen
     BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
         .newReflectiveBlockingService(refreshCallQueueXlator);
 
+    GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
+        new GenericRefreshProtocolServerSideTranslatorPB(this);
+    BlockingService genericRefreshService = GenericRefreshProtocolService
+        .newReflectiveBlockingService(genericRefreshXlator);
+
     GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = 
         new GetUserMappingsProtocolServerSideTranslatorPB(this);
     BlockingService getUserMappingService = GetUserMappingsProtocolService
@@ -279,6 +289,8 @@ class NameNodeRpcServer implements Namen
       // We support Refreshing call queue here in case the client RPC queue is full
       DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
           refreshCallQueueService, serviceRpcServer);
+      DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+          genericRefreshService, serviceRpcServer);
       DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
           getUserMappingService, serviceRpcServer);
   
@@ -323,6 +335,8 @@ class NameNodeRpcServer implements Namen
         refreshUserMappingService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
         refreshCallQueueService, clientRpcServer);
+    DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+        genericRefreshService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
         getUserMappingService, clientRpcServer);
 
@@ -1155,6 +1169,12 @@ class NameNodeRpcServer implements Namen
       serviceRpcServer.refreshCallQueue(conf);
     }
   }
+
+  @Override // GenericRefreshProtocol
+  public Collection<RefreshResponse> refresh(String identifier, String[] args) {
+    // Let the registry handle as needed
+    return RefreshRegistry.defaultRegistry().dispatch(identifier, args);
+  }
   
   @Override // GetUserMappingsProtocol
   public String[] getGroupsForUser(String user) throws IOException {
@@ -1412,6 +1432,11 @@ class NameNodeRpcServer implements Namen
       throws IOException {
     return namesystem.getXAttrs(src, xAttrs);
   }
+
+  @Override
+  public List<XAttr> listXAttrs(String src) throws IOException {
+    return namesystem.listXAttrs(src);
+  }
   
   @Override
   public void removeXAttr(String src, XAttr xAttr) throws IOException {

Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1602947&r1=1602946&r2=1602947&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Mon Jun 16 18:13:57 2014
@@ -234,7 +234,7 @@ public class INodeDirectorySnapshottable
    *           name does not exist or a snapshot with the new name already
    *           exists
    */
-  public void renameSnapshot(String path, String oldName, String newName)
+  void renameSnapshot(String path, String oldName, String newName)
       throws SnapshotException {
     if (newName.equals(oldName)) {
       return;
@@ -246,7 +246,7 @@ public class INodeDirectorySnapshottable
     } else {
       final byte[] newNameBytes = DFSUtil.string2Bytes(newName);
       int indexOfNew = searchSnapshot(newNameBytes);
-      if (indexOfNew > 0) {
+      if (indexOfNew >= 0) {
         throw new SnapshotException("The snapshot " + newName
             + " already exists for directory " + path);
       }



Mime
View raw message